• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download

<lambda>null1 package kotlinx.coroutines.scheduling
2 
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.internal.*
6 import java.io.*
7 import java.util.concurrent.*
8 import java.util.concurrent.locks.*
9 import kotlin.jvm.internal.Ref.ObjectRef
10 import kotlin.math.*
11 
12 /**
13  * Coroutine scheduler (pool of shared threads) with a primary target to distribute dispatched coroutines
14  * over worker threads, including both CPU-intensive and potentially blocking tasks, in the most efficient manner.
15  *
16  * The current scheduler implementation has two optimization targets:
17  * - Efficiency in the face of communication patterns (e.g. actors communicating via channel).
18  * - Dynamic thread state and resizing to schedule blocking calls without re-dispatching coroutine to a separate "blocking" thread pool.
19  *
20  * ### Structural overview
21  *
22  * The scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to
23  * [maxPoolSize] lazily created threads to execute blocking tasks.
24  * The scheduler has two global queues -- one for CPU tasks and one for blocking tasks.
25  * These queues are used for tasks that a submited externally (from threads not belonging to the scheduler)
26  * and as overflow buffers for thread-local queues.
27  *
28  * Every worker has a local queue in addition to global scheduler queues.
29  * The queue to pick the task from is selected randomly to avoid starvation of both local queue and
30  * global queue submitted tasks.
31  * Work-stealing is implemented on top of that queues to provide even load distribution and an illusion of centralized run queue.
32  *
33  * ### Scheduling policy
34  *
35  * When a coroutine is dispatched from within a scheduler worker, it's placed into the head of worker run queue.
36  * If the head is not empty, the task from the head is moved to the tail. Though it is an unfair scheduling policy,
37  * it effectively couples communicating coroutines into one and eliminates scheduling latency
38  * that arises from placing tasks to the end of the queue.
39  * Placing former head to the tail is necessary to provide semi-FIFO order, otherwise, queue degenerates to a stack.
40  * When a coroutine is dispatched from an external thread, it's put into the global queue.
41  * The original idea with a single-slot LIFO buffer comes from Golang runtime scheduler by D. Vyukov.
42  * It was proven to be "fair enough", performant and generally well accepted and initially was a significant inspiration
43  * source for the coroutine scheduler.
44  *
45  * ### Work stealing and affinity
46  *
47  * To provide even tasks distribution worker tries to steal tasks from other workers queues
48  * before parking when his local queue is empty.
49  * A non-standard solution is implemented to provide tasks affinity: a task from FIFO buffer may be stolen
50  * only if it is stale enough based on the value of [WORK_STEALING_TIME_RESOLUTION_NS].
51  * For this purpose, monotonic global clock is used, and every task has a submission time associated with task.
52  * This approach shows outstanding results when coroutines are cooperative,
53  * but as a downside, the scheduler now depends on a high-resolution global clock,
54  * which may limit scalability on NUMA machines.
55  *
56  * ### Thread management
57  *
58  * One of the hardest parts of the scheduler is decentralized management of the threads with progress guarantees
59  * similar to the regular centralized executors.
60  * The state of the threads consists of [controlState] and [parkedWorkersStack] fields.
61  * The former field incorporates the number of created threads, CPU-tokens and blocking tasks
62  * that require thread compensation,
63  * while the latter represents an intrusive versioned Treiber stack of idle workers.
64  * When a worker cannot find any work, it first adds itself to the stack,
65  * then re-scans the queue to avoid missing signals and then attempts to park
66  * with an additional rendezvous against unnecessary parking.
67  * If a worker finds a task that it cannot yet steal due to time constraints, it stores this fact in its state
68  * (to be uncounted when additional work is signalled) and parks for such duration.
69  *
70  * When a new task arrives to the scheduler (whether it is a local or a global queue),
71  * either an idle worker is being signalled, or a new worker is attempted to be created.
72  * (Only [corePoolSize] workers can be created for regular CPU tasks)
73  *
74  * ### Support for blocking tasks
75  *
76  * The scheduler also supports the notion of [blocking][Task.isBlocking] tasks.
77  * When executing or enqueuing blocking tasks, the scheduler notifies or creates an additional worker in
78  * addition to the core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
79  * available to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
80  * "CPU permits" -- #[corePoolSize] special tokens that allow an arbitrary worker to execute and steal CPU-bound tasks.
81  * When a worker encounters a blocking tasks, it releases its permit to the scheduler to
82  * keep an invariant "scheduler always has at least min(pending CPU tasks, core pool size)
83  * and at most core pool size threads to execute CPU tasks".
84  * To avoid overprovision, workers without CPU permit are allowed to scan [globalBlockingQueue]
85  * and steal **only** blocking tasks from other workers which imposes a non-trivial complexity to the queue management.
86  *
87  * The scheduler does not limit the count of pending blocking tasks, potentially creating up to [maxPoolSize] threads.
88  * End users do not have access to the scheduler directly and can dispatch blocking tasks only with
89  * [LimitingDispatcher] that does control concurrency level by its own mechanism.
90  */
91 @Suppress("NOTHING_TO_INLINE")
92 internal class CoroutineScheduler(
93     @JvmField val corePoolSize: Int,
94     @JvmField val maxPoolSize: Int,
95     @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
96     @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
97 ) : Executor, Closeable {
98     init {
99         require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
100             "Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
101         }
102         require(maxPoolSize >= corePoolSize) {
103             "Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
104         }
105         require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
106             "Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
107         }
108         require(idleWorkerKeepAliveNs > 0) {
109             "Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
110         }
111     }
112 
113     @JvmField
114     val globalCpuQueue = GlobalQueue()
115 
116     @JvmField
117     val globalBlockingQueue = GlobalQueue()
118 
119     private fun addToGlobalQueue(task: Task): Boolean {
120         return if (task.isBlocking) {
121             globalBlockingQueue.addLast(task)
122         } else {
123             globalCpuQueue.addLast(task)
124         }
125     }
126 
127     /**
128      * The stack of parker workers.
129      * Every worker registers itself in a stack before parking (if it was not previously registered),
130      * so it can be signalled when new tasks arrive.
131      * This is a form of intrusive garbage-free Treiber stack where [Worker] also is a stack node.
132      *
133      * The stack is better than a queue (even with the contention on top) because it unparks threads
134      * in most-recently used order, improving both performance and locality.
135      * Moreover, it decreases threads thrashing, if the pool has n threads when only n / 2 is required,
136      * the latter half will never be unparked and will terminate itself after [IDLE_WORKER_KEEP_ALIVE_NS].
137      *
138      * This long version consist of version bits with [PARKED_VERSION_MASK]
139      * and top worker thread index bits with [PARKED_INDEX_MASK].
140      */
141     private val parkedWorkersStack = atomic(0L)
142 
143     /**
144      * Updates index of the worker at the top of [parkedWorkersStack].
145      * It always updates version to ensure interference with [parkedWorkersStackPop] operation
146      * that might have already decided to put this index to the top.
147      *
148      * Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]).
149      */
150     fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) {
151         parkedWorkersStack.loop { top ->
152             val index = (top and PARKED_INDEX_MASK).toInt()
153             val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
154             val updIndex = if (index == oldIndex) {
155                 if (newIndex == 0) {
156                     parkedWorkersStackNextIndex(worker)
157                 } else {
158                     newIndex
159                 }
160             } else {
161                 index // no change to index, but update version
162             }
163             if (updIndex < 0) return@loop // retry
164             if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return
165         }
166     }
167 
168     /**
169      * Pushes worker into [parkedWorkersStack].
170      * It does nothing is this worker is already physically linked to the stack.
171      * This method is invoked only from the worker thread itself.
172      * This invocation always precedes [LockSupport.parkNanos].
173      * See [Worker.tryPark].
174      *
175      * Returns `true` if worker was added to the stack by this invocation, `false` if it was already
176      * registered in the stack.
177      */
178     fun parkedWorkersStackPush(worker: Worker): Boolean {
179         if (worker.nextParkedWorker !== NOT_IN_STACK) return false // already in stack, bail out
180         /*
181          * The below loop can be entered only if this worker was not in the stack and, since no other thread
182          * can add it to the stack (only the worker itself), this invariant holds while this loop executes.
183          */
184         parkedWorkersStack.loop { top ->
185             val index = (top and PARKED_INDEX_MASK).toInt()
186             val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
187             val updIndex = worker.indexInArray
188             assert { updIndex != 0 } // only this worker can push itself, cannot be terminated
189             worker.nextParkedWorker = workers[index]
190             /*
191              * Other thread can be changing this worker's index at this point, but it
192              * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail.
193              * Successful CAS of the stack top completes successful push.
194              */
195             if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return true
196         }
197     }
198 
199     /**
200      * Pops worker from [parkedWorkersStack].
201      * It can be invoked concurrently from any thread that is looking for help and needs to unpark some worker.
202      * This invocation is always followed by an attempt to [LockSupport.unpark] resulting worker.
203      * See [tryUnpark].
204      */
205     private fun parkedWorkersStackPop(): Worker? {
206         parkedWorkersStack.loop { top ->
207             val index = (top and PARKED_INDEX_MASK).toInt()
208             val worker = workers[index] ?: return null // stack is empty
209             val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
210             val updIndex = parkedWorkersStackNextIndex(worker)
211             if (updIndex < 0) return@loop // retry
212             /*
213              * Other thread can be changing this worker's index at this point, but it
214              * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail.
215              * Successful CAS of the stack top completes successful pop.
216              */
217             if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) {
218                 /*
219                  * We've just took worker out of the stack, but nextParkerWorker is not reset yet, so if a worker is
220                  * currently invoking parkedWorkersStackPush it would think it is in the stack and bail out without
221                  * adding itself again. It does not matter, since we are going it invoke unpark on the thread
222                  * that was popped out of parkedWorkersStack anyway.
223                  */
224                 worker.nextParkedWorker = NOT_IN_STACK
225                 return worker
226             }
227         }
228     }
229 
230     /**
231      * Finds next usable index for [parkedWorkersStack]. The problem is that workers can
232      * be terminated at their [Worker.indexInArray] becomes zero, so they cannot be
233      * put at the top of the stack. In which case we are looking for next.
234      *
235      * Returns `index >= 0` or `-1` for retry.
236      */
237     private fun parkedWorkersStackNextIndex(worker: Worker): Int {
238         var next = worker.nextParkedWorker
239         findNext@ while (true) {
240             when {
241                 next === NOT_IN_STACK -> return -1 // we are too late -- other thread popped this element, retry
242                 next === null -> return 0 // stack becomes empty
243                 else -> {
244                     val nextWorker = next as Worker
245                     val updIndex = nextWorker.indexInArray
246                     if (updIndex != 0) return updIndex // found good index for next worker
247                     // Otherwise, this worker was terminated and we cannot put it to top anymore, check next
248                     next = nextWorker.nextParkedWorker
249                 }
250             }
251         }
252     }
253 
254     /**
255      * State of worker threads.
256      * [workers] is a dynamic array of lazily created workers up to [maxPoolSize] workers.
257      * [createdWorkers] is count of already created workers (worker with index lesser than [createdWorkers] exists).
258      * [blockingTasks] is count of pending (either in the queue or being executed) blocking tasks.
259      *
260      * Workers array is also used as a lock for workers' creation and termination sequence.
261      *
262      * **NOTE**: `workers[0]` is always `null` (never used, works as sentinel value), so
263      * workers are 1-indexed, code path in [Worker.trySteal] is a bit faster and index swap during termination
264      * works properly.
265      *
266      * Initial size is `Dispatchers.Default` size * 2 to prevent unnecessary resizes for slightly or steadily loaded
267      * applications.
268      */
269     @JvmField
270     val workers = ResizableAtomicArray<Worker>((corePoolSize + 1) * 2)
271 
272     /**
273      * The `Long` value describing the state of workers in this pool.
274      * Currently, includes created, CPU-acquired, and blocking workers, each occupying [BLOCKING_SHIFT] bits.
275      *
276      * State layout (highest to lowest bits):
277      * | --- number of cpu permits, 22 bits ---  | --- number of blocking tasks, 21 bits ---  | --- number of created threads, 21 bits ---  |
278      */
279     private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
280 
281     private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
282     private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
283 
284     private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
285     private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
286     inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
287 
288     // Guarded by synchronization
289     private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
290     private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
291 
292     private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
293 
294     private inline fun decrementBlockingTasks() {
295         controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
296     }
297 
298     private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
299         val available = availableCpuPermits(state)
300         if (available == 0) return false
301         val update = state - (1L shl CPU_PERMITS_SHIFT)
302         if (controlState.compareAndSet(state, update)) return true
303     }
304 
305     private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
306 
307     // This is used a "stop signal" for close and shutdown functions
308     private val _isTerminated = atomic(false)
309     val isTerminated: Boolean get() = _isTerminated.value
310 
311     companion object {
312         // A symbol to mark workers that are not in parkedWorkersStack
313         @JvmField
314         val NOT_IN_STACK = Symbol("NOT_IN_STACK")
315 
316         // Worker ctl states
317         private const val PARKED = -1
318         private const val CLAIMED = 0
319         private const val TERMINATED = 1
320 
321         // Masks of control state
322         private const val BLOCKING_SHIFT = 21 // 2M threads max
323         private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1
324         private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT
325         private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2
326         private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT
327 
328         internal const val MIN_SUPPORTED_POOL_SIZE = 1 // we support 1 for test purposes, but it is not usually used
329         internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
330 
331         // Masks of parkedWorkersStack
332         private const val PARKED_INDEX_MASK = CREATED_MASK
333         private const val PARKED_VERSION_MASK = CREATED_MASK.inv()
334         private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
335     }
336 
337     override fun execute(command: Runnable) = dispatch(command)
338 
339     override fun close() = shutdown(10_000L)
340 
341     // Shuts down current scheduler and waits until all work is done and all threads are stopped.
342     fun shutdown(timeout: Long) {
343         // atomically set termination flag which is checked when workers are added or removed
344         if (!_isTerminated.compareAndSet(false, true)) return
345         // make sure we are not waiting for the current thread
346         val currentWorker = currentWorker()
347         // Capture # of created workers that cannot change anymore (mind the synchronized block!)
348         val created = synchronized(workers) { createdWorkers }
349         // Shutdown all workers with the only exception of the current thread
350         for (i in 1..created) {
351             val worker = workers[i]!!
352             if (worker !== currentWorker) {
353                 // Note: this is java.lang.Thread.getState() of type java.lang.Thread.State
354                 while (worker.getState() != Thread.State.TERMINATED) {
355                     LockSupport.unpark(worker)
356                     worker.join(timeout)
357                 }
358                 // Note: this is CoroutineScheduler.Worker.state of type CoroutineScheduler.WorkerState
359                 assert { worker.state === WorkerState.TERMINATED } // Expected TERMINATED state
360                 worker.localQueue.offloadAllWorkTo(globalBlockingQueue) // Doesn't actually matter which queue to use
361             }
362         }
363         // Make sure no more work is added to GlobalQueue from anywhere
364         globalBlockingQueue.close()
365         globalCpuQueue.close()
366         // Finish processing tasks from globalQueue and/or from this worker's local queue
367         while (true) {
368             val task = currentWorker?.findTask(true)
369                 ?: globalCpuQueue.removeFirstOrNull()
370                 ?: globalBlockingQueue.removeFirstOrNull()
371                 ?: break
372             runSafely(task)
373         }
374         // Shutdown current thread
375         currentWorker?.tryReleaseCpu(WorkerState.TERMINATED)
376         // check & cleanup state
377         assert { availableCpuPermits == corePoolSize }
378         parkedWorkersStack.value = 0L
379         controlState.value = 0L
380     }
381 
382     /**
383      * Dispatches execution of a runnable [block] with a hint to a scheduler whether
384      * this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
385      *
386      * [taskContext] -- concurrency context of given [block].
387      * [fair] -- whether this [dispatch] call is fair.
388      * If `true` then the task will be dispatched in a FIFO manner.
389      * Note that caller cannot be ensured that it is being executed on worker thread for the following reasons:
390      *   - [CoroutineStart.UNDISPATCHED]
391      *   - Concurrent [close] that effectively shutdowns the worker thread.
392      * Used for [yield].
393      */
394     fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
395         trackTask() // this is needed for virtual time support
396         val task = createTask(block, taskContext)
397         val isBlockingTask = task.isBlocking
398         // Invariant: we increment counter **before** publishing the task
399         // so executing thread can safely decrement the number of blocking tasks
400         val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0
401         // try to submit the task to the local queue and act depending on the result
402         val currentWorker = currentWorker()
403         val notAdded = currentWorker.submitToLocalQueue(task, fair)
404         if (notAdded != null) {
405             if (!addToGlobalQueue(notAdded)) {
406                 // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
407                 throw RejectedExecutionException("$schedulerName was terminated")
408             }
409         }
410         // Checking 'task' instead of 'notAdded' is completely okay
411         if (isBlockingTask) {
412             // Use state snapshot to better estimate the number of running threads
413             signalBlockingWork(stateSnapshot)
414         } else {
415             signalCpuWork()
416         }
417     }
418 
419     fun createTask(block: Runnable, taskContext: TaskContext): Task {
420         val nanoTime = schedulerTimeSource.nanoTime()
421         if (block is Task) {
422             block.submissionTime = nanoTime
423             block.taskContext = taskContext
424             return block
425         }
426         return block.asTask(nanoTime, taskContext)
427     }
428 
429     // NB: should only be called from 'dispatch' method due to blocking tasks increment
430     private fun signalBlockingWork(stateSnapshot: Long) {
431         if (tryUnpark()) return
432         // Use state snapshot to avoid accidental thread overprovision
433         if (tryCreateWorker(stateSnapshot)) return
434         tryUnpark() // Try unpark again in case there was race between permit release and parking
435     }
436 
437     fun signalCpuWork() {
438         if (tryUnpark()) return
439         if (tryCreateWorker()) return
440         tryUnpark()
441     }
442 
443     private fun tryCreateWorker(state: Long = controlState.value): Boolean {
444         val created = createdWorkers(state)
445         val blocking = blockingTasks(state)
446         val cpuWorkers = (created - blocking).coerceAtLeast(0)
447         /*
448          * We check how many threads are there to handle non-blocking work,
449          * and create one more if we have not enough of them.
450          */
451         if (cpuWorkers < corePoolSize) {
452             val newCpuWorkers = createNewWorker()
453             // If we've created the first cpu worker and corePoolSize > 1 then create
454             // one more (second) cpu worker, so that stealing between them is operational
455             if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
456             if (newCpuWorkers > 0) return true
457         }
458         return false
459     }
460 
461     private fun tryUnpark(): Boolean {
462         while (true) {
463             val worker = parkedWorkersStackPop() ?: return false
464             if (worker.workerCtl.compareAndSet(PARKED, CLAIMED)) {
465                 LockSupport.unpark(worker)
466                 return true
467             }
468         }
469     }
470 
471     /**
472      * Returns the number of CPU workers after this function (including new worker) or
473      * 0 if no worker was created.
474      */
475     private fun createNewWorker(): Int {
476         val worker: Worker
477         return synchronized(workers) {
478             // Make sure we're not trying to resurrect terminated scheduler
479             if (isTerminated) return -1
480             val state = controlState.value
481             val created = createdWorkers(state)
482             val blocking = blockingTasks(state)
483             val cpuWorkers = (created - blocking).coerceAtLeast(0)
484             // Double check for overprovision
485             if (cpuWorkers >= corePoolSize) return 0
486             if (created >= maxPoolSize) return 0
487             // start & register new worker, commit index only after successful creation
488             val newIndex = createdWorkers + 1
489             require(newIndex > 0 && workers[newIndex] == null)
490             /*
491              * 1) Claim the slot (under a lock) by the newly created worker
492              * 2) Make it observable by increment created workers count
493              * 3) Only then start the worker, otherwise it may miss its own creation
494              */
495             worker = Worker(newIndex)
496             workers.setSynchronized(newIndex, worker)
497             require(newIndex == incrementCreatedWorkers())
498             cpuWorkers + 1
499         }.also { worker.start() } // Start worker when the lock is released to reduce contention, see #3652
500     }
501 
502     /**
503      * Returns `null` if task was successfully added or an instance of the
504      * task that was not added or replaced (thus should be added to global queue).
505      */
506     private fun Worker?.submitToLocalQueue(task: Task, fair: Boolean): Task? {
507         if (this == null) return task
508         /*
509          * This worker could have been already terminated from this thread by close/shutdown and it should not
510          * accept any more tasks into its local queue.
511          */
512         if (state === WorkerState.TERMINATED) return task
513         // Do not add CPU tasks in local queue if we are not able to execute it
514         if (!task.isBlocking && state === WorkerState.BLOCKING) {
515             return task
516         }
517         mayHaveLocalTasks = true
518         return localQueue.add(task, fair = fair)
519     }
520 
521     private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
522 
523     /**
524      * Returns a string identifying the state of this scheduler for nicer debugging.
525      * Note that this method is not atomic and represents rough state of pool.
526      *
527      * State of the queues:
528      * b for blocking, c for CPU, r for retiring.
529      * E.g. for [1b, 1b, 2c, 1d] means that pool has
530      * two blocking workers with queue size 1, one worker with CPU permit and queue size 1
531      * and one dormant (executing his local queue before parking) worker with queue size 1.
532      */
533     override fun toString(): String {
534         var parkedWorkers = 0
535         var blockingWorkers = 0
536         var cpuWorkers = 0
537         var dormant = 0
538         var terminated = 0
539         val queueSizes = arrayListOf<String>()
540         for (index in 1 until workers.currentLength()) {
541             val worker = workers[index] ?: continue
542             val queueSize = worker.localQueue.size
543             when (worker.state) {
544                 WorkerState.PARKING -> ++parkedWorkers
545                 WorkerState.BLOCKING -> {
546                     ++blockingWorkers
547                     queueSizes += queueSize.toString() + "b" // Blocking
548                 }
549 
550                 WorkerState.CPU_ACQUIRED -> {
551                     ++cpuWorkers
552                     queueSizes += queueSize.toString() + "c" // CPU
553                 }
554 
555                 WorkerState.DORMANT -> {
556                     ++dormant
557                     if (queueSize > 0) queueSizes += queueSize.toString() + "d" // Retiring
558                 }
559 
560                 WorkerState.TERMINATED -> ++terminated
561             }
562         }
563         val state = controlState.value
564         return "$schedulerName@$hexAddress[" +
565             "Pool Size {" +
566             "core = $corePoolSize, " +
567             "max = $maxPoolSize}, " +
568             "Worker States {" +
569             "CPU = $cpuWorkers, " +
570             "blocking = $blockingWorkers, " +
571             "parked = $parkedWorkers, " +
572             "dormant = $dormant, " +
573             "terminated = $terminated}, " +
574             "running workers queues = $queueSizes, " +
575             "global CPU queue size = ${globalCpuQueue.size}, " +
576             "global blocking queue size = ${globalBlockingQueue.size}, " +
577             "Control State {" +
578             "created workers= ${createdWorkers(state)}, " +
579             "blocking tasks = ${blockingTasks(state)}, " +
580             "CPUs acquired = ${corePoolSize - availableCpuPermits(state)}" +
581             "}]"
582     }
583 
584     fun runSafely(task: Task) {
585         try {
586             task.run()
587         } catch (e: Throwable) {
588             val thread = Thread.currentThread()
589             thread.uncaughtExceptionHandler.uncaughtException(thread, e)
590         } finally {
591             unTrackTask()
592         }
593     }
594 
595     internal inner class Worker private constructor() : Thread() {
596         init {
597             isDaemon = true
598             /*
599              * `Dispatchers.Default` is used as *the* dispatcher in the containerized environments,
600              * isolated by their own classloaders. Workers are populated lazily, thus we are inheriting
601              * `Dispatchers.Default` context class loader here instead of using parent' thread one
602              * in order not to accidentally capture temporary application classloader.
603              */
604             contextClassLoader = this@CoroutineScheduler.javaClass.classLoader
605         }
606 
607         // guarded by scheduler lock, index in workers array, 0 when not in array (terminated)
608         @Volatile // volatile for push/pop operation into parkedWorkersStack
609         var indexInArray = 0
610             set(index) {
611                 name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
612                 field = index
613             }
614 
615         constructor(index: Int) : this() {
616             indexInArray = index
617         }
618 
619         inline val scheduler get() = this@CoroutineScheduler
620 
621         @JvmField
622         val localQueue: WorkQueue = WorkQueue()
623 
624         /**
625          * Slot that is used to steal tasks into to avoid re-adding them
626          * to the local queue. See [trySteal]
627          */
628         private val stolenTask: ObjectRef<Task?> = ObjectRef()
629 
630         /**
631          * Worker state. **Updated only by this worker thread**.
632          * By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken.
633          * Is used locally by the worker to maintain its own invariants.
634          */
635         @JvmField
636         var state = WorkerState.DORMANT
637 
638         /**
639          * Worker control state responsible for worker claiming, parking and termination.
640          * List of states:
641          * [PARKED] -- worker is parked and can self-terminate after a termination deadline.
642          * [CLAIMED] -- worker is claimed by an external submitter.
643          * [TERMINATED] -- worker is terminated and no longer usable.
644          */
645         val workerCtl = atomic(CLAIMED)
646 
647         /**
648          * It is set to the termination deadline when started doing [park] and it reset
649          * when there is a task. It serves as protection against spurious wakeups of parkNanos.
650          */
651         private var terminationDeadline = 0L
652 
653         /**
654          * Reference to the next worker in the [parkedWorkersStack].
655          * It may be `null` if there is no next parked worker.
656          * This reference is set to [NOT_IN_STACK] when worker is physically not in stack.
657          */
658         @Volatile
659         var nextParkedWorker: Any? = NOT_IN_STACK
660 
661         /*
662          * The delay until at least one task in other worker queues will become stealable.
663          */
664         private var minDelayUntilStealableTaskNs = 0L
665 
666         /**
667          * The state of embedded Marsaglia xorshift random number generator, used for work-stealing purposes.
668          * It is initialized with a seed.
669          */
670         private var rngState: Int = run {
671             // This could've been Random.nextInt(), but we are shaving an extra initialization cost, see #4051
672             val seed = System.nanoTime().toInt()
673             // rngState shouldn't be zero, as required for the xorshift algorithm
674             if (seed != 0) return@run seed
675             42
676         }
677 
678         /**
679          * Tries to acquire CPU token if worker doesn't have one
680          * @return whether worker acquired (or already had) CPU token
681          */
682         private fun tryAcquireCpuPermit(): Boolean = when {
683             state == WorkerState.CPU_ACQUIRED -> true
684             this@CoroutineScheduler.tryAcquireCpuPermit() -> {
685                 state = WorkerState.CPU_ACQUIRED
686                 true
687             }
688 
689             else -> false
690         }
691 
692         /**
693          * Releases CPU token if worker has any and changes state to [newState].
694          * Returns `true` if CPU permit was returned to the pool
695          */
696         fun tryReleaseCpu(newState: WorkerState): Boolean {
697             val previousState = state
698             val hadCpu = previousState == WorkerState.CPU_ACQUIRED
699             if (hadCpu) releaseCpuPermit()
700             if (previousState != newState) state = newState
701             return hadCpu
702         }
703 
704         override fun run() = runWorker()
705 
706         @JvmField
707         var mayHaveLocalTasks = false
708 
709         private fun runWorker() {
710             var rescanned = false
711             while (!isTerminated && state != WorkerState.TERMINATED) {
712                 val task = findTask(mayHaveLocalTasks)
713                 // Task found. Execute and repeat
714                 if (task != null) {
715                     rescanned = false
716                     minDelayUntilStealableTaskNs = 0L
717                     executeTask(task)
718                     continue
719                 } else {
720                     mayHaveLocalTasks = false
721                 }
722                 /*
723                  * No tasks were found:
724                  * 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline.
725                  *    Then its deadline is stored in [minDelayUntilStealableTask]
726                  * // '2)' can be found below
727                  *
728                  * Then just park for that duration (ditto re-scanning).
729                  * While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
730                  * excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve
731                  * it with "spinning via scans" mechanism.
732                  * NB: this short potential parking does not interfere with `tryUnpark`
733                  */
734                 if (minDelayUntilStealableTaskNs != 0L) {
735                     if (!rescanned) {
736                         rescanned = true
737                     } else {
738                         rescanned = false
739                         tryReleaseCpu(WorkerState.PARKING)
740                         interrupted()
741                         LockSupport.parkNanos(minDelayUntilStealableTaskNs)
742                         minDelayUntilStealableTaskNs = 0L
743                     }
744                     continue
745                 }
746                 /*
747                  * 2) Or no tasks available, time to park and, potentially, shut down the thread.
748                  * Add itself to the stack of parked workers, re-scans all the queues
749                  * to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks.
750                  */
751                 tryPark()
752             }
753             tryReleaseCpu(WorkerState.TERMINATED)
754         }
755 
756         /**
757          * See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details.
758          * This is a fine-tailored method for a specific use-case not expected to be used widely.
759          */
760         fun runSingleTask(): Long {
761             val stateSnapshot = state
762             val isCpuThread = state == WorkerState.CPU_ACQUIRED
763             val task = if (isCpuThread) {
764                 findCpuTask()
765             } else {
766                 findBlockingTask()
767             }
768             if (task == null) {
769                 if (minDelayUntilStealableTaskNs == 0L) return -1L
770                 return minDelayUntilStealableTaskNs
771             }
772             runSafely(task)
773             if (!isCpuThread) decrementBlockingTasks()
774             assert { state == stateSnapshot }
775             return 0L
776         }
777 
778         fun isIo() = state == WorkerState.BLOCKING
779 
780         // Counterpart to "tryUnpark"
781         private fun tryPark() {
782             if (!inStack()) {
783                 parkedWorkersStackPush(this)
784                 return
785             }
786             workerCtl.value = PARKED // Update value once
787             /*
788              * inStack() prevents spurious wakeups, while workerCtl.value == PARKED
789              * prevents the following race:
790              *
791              * - T2 scans the queue, adds itself to the stack, goes to rescan
792              * - T2 suspends in 'workerCtl.value = PARKED' line
793              * - T1 pops T2 from the stack, claims workerCtl, suspends
794              * - T2 fails 'while (inStack())' check, goes to full rescan
795              * - T2 adds itself to the stack, parks
796              * - T1 unparks T2, bails out with success
797              * - T2 unparks and loops in 'while (inStack())'
798              */
799             while (inStack() && workerCtl.value == PARKED) { // Prevent spurious wakeups
800                 if (isTerminated || state == WorkerState.TERMINATED) break
801                 tryReleaseCpu(WorkerState.PARKING)
802                 interrupted() // Cleanup interruptions
803                 park()
804             }
805         }
806 
807         private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK
808 
809         private fun executeTask(task: Task) {
810             terminationDeadline = 0L // reset deadline for termination
811             if (state == WorkerState.PARKING) {
812                 assert { task.isBlocking }
813                 state = WorkerState.BLOCKING
814             }
815             if (task.isBlocking) {
816                 // Always notify about new work when releasing CPU-permit to execute some blocking task
817                 if (tryReleaseCpu(WorkerState.BLOCKING)) {
818                     signalCpuWork()
819                 }
820                 runSafely(task)
821                 decrementBlockingTasks()
822                 val currentState = state
823                 // Shutdown sequence of blocking dispatcher
824                 if (currentState !== WorkerState.TERMINATED) {
825                     assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
826                     state = WorkerState.DORMANT
827                 }
828             } else {
829                 runSafely(task)
830             }
831         }
832 
833         /*
834          * Marsaglia xorshift RNG with period 2^32-1 for work stealing purposes.
835          * ThreadLocalRandom cannot be used to support Android and ThreadLocal<Random> is up to 15% slower on Ktor benchmarks
836          */
837         fun nextInt(upperBound: Int): Int {
838             var r = rngState
839             r = r xor (r shl 13)
840             r = r xor (r shr 17)
841             r = r xor (r shl 5)
842             rngState = r
843             val mask = upperBound - 1
844             // Fast path for power of two bound
845             if (mask and upperBound == 0) {
846                 return r and mask
847             }
848             return (r and Int.MAX_VALUE) % upperBound
849         }
850 
851         private fun park() {
852             // set termination deadline the first time we are here (it is reset in idleReset)
853             if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs
854             // actually park
855             LockSupport.parkNanos(idleWorkerKeepAliveNs)
856             // try terminate when we are idle past termination deadline
857             // note that comparison is written like this to protect against potential nanoTime wraparound
858             if (System.nanoTime() - terminationDeadline >= 0) {
859                 terminationDeadline = 0L // if attempt to terminate worker fails we'd extend deadline again
860                 tryTerminateWorker()
861             }
862         }
863 
864         /**
865          * Stops execution of current thread and removes it from [createdWorkers].
866          */
867         private fun tryTerminateWorker() {
868             synchronized(workers) {
869                 // Make sure we're not trying race with termination of scheduler
870                 if (isTerminated) return
871                 // Someone else terminated, bail out
872                 if (createdWorkers <= corePoolSize) return
873                 /*
874                  * See tryUnpark for state reasoning.
875                  * If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
876                  */
877                 if (!workerCtl.compareAndSet(PARKED, TERMINATED)) return
878                 /*
879                  * At this point this thread is no longer considered as usable for scheduling.
880                  * We need multi-step choreography to reindex workers.
881                  *
882                  * 1) Read current worker's index and reset it to zero.
883                  */
884                 val oldIndex = indexInArray
885                 indexInArray = 0
886                 /*
887                  * Now this worker cannot become the top of parkedWorkersStack, but it can
888                  * still be at the stack top via oldIndex.
889                  *
890                  * 2) Update top of stack if it was pointing to oldIndex and make sure no
891                  *    pending push/pop operation that might have already retrieved oldIndex could complete.
892                  */
893                 parkedWorkersStackTopUpdate(this, oldIndex, 0)
894                 /*
895                  * 3) Move last worker into an index in array that was previously occupied by this worker,
896                  *    if last worker was a different one (sic!).
897                  */
898                 val lastIndex = decrementCreatedWorkers()
899                 if (lastIndex != oldIndex) {
900                     val lastWorker = workers[lastIndex]!!
901                     workers.setSynchronized(oldIndex, lastWorker)
902                     lastWorker.indexInArray = oldIndex
903                     /*
904                      * Now lastWorker is available at both indices in the array, but it can
905                      * still be at the stack top on via its lastIndex
906                      *
907                      * 4) Update top of stack lastIndex -> oldIndex and make sure no
908                      *    pending push/pop operation that might have already retrieved lastIndex could complete.
909                      */
910                     parkedWorkersStackTopUpdate(lastWorker, lastIndex, oldIndex)
911                 }
912                 /*
913                  * 5) It is safe to clear reference from workers array now.
914                  */
915                 workers.setSynchronized(lastIndex, null)
916             }
917             state = WorkerState.TERMINATED
918         }
919 
920         fun findTask(mayHaveLocalTasks: Boolean): Task? {
921             if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks)
922             /*
923              * If we can't acquire a CPU permit, attempt to find blocking task:
924              * - Check if our queue has one (maybe mixed in with CPU tasks)
925              * - Poll global and try steal
926              */
927             return findBlockingTask()
928         }
929 
930         // NB: ONLY for runSingleTask method
931         private fun findBlockingTask(): Task? {
932             return localQueue.pollBlocking()
933                 ?: globalBlockingQueue.removeFirstOrNull()
934                 ?: trySteal(STEAL_BLOCKING_ONLY)
935         }
936 
937         // NB: ONLY for runSingleTask method
938         private fun findCpuTask(): Task? {
939             return localQueue.pollCpu()
940                 ?: globalBlockingQueue.removeFirstOrNull()
941                 ?: trySteal(STEAL_CPU_ONLY)
942         }
943 
944         private fun findAnyTask(scanLocalQueue: Boolean): Task? {
945             /*
946              * Anti-starvation mechanism: probabilistically poll either local
947              * or global queue to ensure progress for both external and internal tasks.
948              */
949             if (scanLocalQueue) {
950                 val globalFirst = nextInt(2 * corePoolSize) == 0
951                 if (globalFirst) pollGlobalQueues()?.let { return it }
952                 localQueue.poll()?.let { return it }
953                 if (!globalFirst) pollGlobalQueues()?.let { return it }
954             } else {
955                 pollGlobalQueues()?.let { return it }
956             }
957             return trySteal(STEAL_ANY)
958         }
959 
960         private fun pollGlobalQueues(): Task? {
961             if (nextInt(2) == 0) {
962                 globalCpuQueue.removeFirstOrNull()?.let { return it }
963                 return globalBlockingQueue.removeFirstOrNull()
964             } else {
965                 globalBlockingQueue.removeFirstOrNull()?.let { return it }
966                 return globalCpuQueue.removeFirstOrNull()
967             }
968         }
969 
970         private fun trySteal(stealingMode: StealingMode): Task? {
971             val created = createdWorkers
972             // 0 to await an initialization and 1 to avoid excess stealing on single-core machines
973             if (created < 2) {
974                 return null
975             }
976 
977             var currentIndex = nextInt(created)
978             var minDelay = Long.MAX_VALUE
979             repeat(created) {
980                 ++currentIndex
981                 if (currentIndex > created) currentIndex = 1
982                 val worker = workers[currentIndex]
983                 if (worker !== null && worker !== this) {
984                     val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask)
985                     if (stealResult == TASK_STOLEN) {
986                         val result = stolenTask.element
987                         stolenTask.element = null
988                         return result
989                     } else if (stealResult > 0) {
990                         minDelay = min(minDelay, stealResult)
991                     }
992                 }
993             }
994             minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
995             return null
996         }
997     }
998 
999     enum class WorkerState {
1000         /**
1001          * Has CPU token and either executes a [Task.isBlocking]` == false` task or tries to find one.
1002          */
1003         CPU_ACQUIRED,
1004 
1005         /**
1006          * Executing task with [Task.isBlocking].
1007          */
1008         BLOCKING,
1009 
1010         /**
1011          * Currently parked.
1012          */
1013         PARKING,
1014 
1015         /**
1016          * Tries to execute its local work and then goes to infinite sleep as no longer needed worker.
1017          */
1018         DORMANT,
1019 
1020         /**
1021          * Terminal state, will no longer be used
1022          */
1023         TERMINATED
1024     }
1025 }
1026 
1027 /**
1028  * Checks if the thread is part of a thread pool that supports coroutines.
1029  * This function is needed for integration with BlockHound.
1030  */
1031 @JvmName("isSchedulerWorker")
isSchedulerWorkernull1032 internal fun isSchedulerWorker(thread: Thread) = thread is CoroutineScheduler.Worker
1033 
1034 /**
1035  * Checks if the thread is running a CPU-bound task.
1036  * This function is needed for integration with BlockHound.
1037  */
1038 @JvmName("mayNotBlock")
1039 internal fun mayNotBlock(thread: Thread) = thread is CoroutineScheduler.Worker &&
1040     thread.state == CoroutineScheduler.WorkerState.CPU_ACQUIRED
1041