1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.scheduling
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlinx.coroutines.internal.*
10 import java.io.*
11 import java.util.concurrent.*
12 import java.util.concurrent.locks.*
13 import kotlin.jvm.internal.Ref.ObjectRef
14 import kotlin.math.*
15 import kotlin.random.*
16
17 /**
18 * Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutines
19 * over worker threads, including both CPU-intensive and blocking tasks, in the most efficient manner.
20 *
21 * Current scheduler implementation has two optimization targets:
22 * * Efficiency in the face of communication patterns (e.g. actors communicating via channel)
23 * * Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool.
24 *
25 * ### Structural overview
26 *
27 * Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to
28 * [maxPoolSize] lazily created threads to execute blocking tasks.
29 * Every worker has a local queue in addition to a global scheduler queue
30 * and the global queue has priority over local queue to avoid starvation of externally-submitted
31 * (e.g. from Android UI thread) tasks.
32 * Work-stealing is implemented on top of that queues to provide
33 * even load distribution and illusion of centralized run queue.
34 *
35 * ### Scheduling policy
36 *
37 * When a coroutine is dispatched from within a scheduler worker, it's placed into the head of worker run queue.
38 * If the head is not empty, the task from the head is moved to the tail. Though it is an unfair scheduling policy,
39 * it effectively couples communicating coroutines into one and eliminates scheduling latency
40 * that arises from placing tasks to the end of the queue.
41 * Placing former head to the tail is necessary to provide semi-FIFO order, otherwise, queue degenerates to stack.
42 * When a coroutine is dispatched from an external thread, it's put into the global queue.
43 * The original idea with a single-slot LIFO buffer comes from Golang runtime scheduler by D. Vyukov.
44 * It was proven to be "fair enough", performant and generally well accepted and initially was a significant inspiration
45 * source for the coroutine scheduler.
46 *
47 * ### Work stealing and affinity
48 *
49 * To provide even tasks distribution worker tries to steal tasks from other workers queues
50 * before parking when his local queue is empty.
51 * A non-standard solution is implemented to provide tasks affinity: a task from FIFO buffer may be stolen
52 * only if it is stale enough based on the value of [WORK_STEALING_TIME_RESOLUTION_NS].
53 * For this purpose, monotonic global clock is used, and every task has associated with its submission time.
54 * This approach shows outstanding results when coroutines are cooperative,
55 * but as downside scheduler now depends on a high-resolution global clock,
56 * which may limit scalability on NUMA machines. Tasks from LIFO buffer can be stolen on a regular basis.
57 *
58 * ### Thread management
59 * One of the hardest parts of the scheduler is decentralized management of the threads with the progress guarantees
60 * similar to the regular centralized executors.
61 * The state of the threads consists of [controlState] and [parkedWorkersStack] fields.
62 * The former field incorporates the amount of created threads, CPU-tokens and blocking tasks
63 * that require a thread compensation,
64 * while the latter represents intrusive versioned Treiber stack of idle workers.
65 * When a worker cannot find any work, they first add themselves to the stack,
66 * then re-scans the queue to avoid missing signals and then attempts to park
67 * with additional rendezvous against unnecessary parking.
68 * If a worker finds a task that it cannot yet steal due to time constraints, it stores this fact in its state
69 * (to be uncounted when additional work is signalled) and parks for such duration.
70 *
71 * When a new task arrives in the scheduler (whether it is local or global queue),
72 * either an idle worker is being signalled, or a new worker is attempted to be created.
73 * (Only [corePoolSize] workers can be created for regular CPU tasks)
74 *
75 * ### Support for blocking tasks
76 * The scheduler also supports the notion of [blocking][TASK_PROBABLY_BLOCKING] tasks.
77 * When executing or enqueuing blocking tasks, the scheduler notifies or creates one more worker in
78 * addition to core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
79 * to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
80 * "CPU permits" -- [corePoolSize] special tokens that permit an arbitrary worker to execute and steal CPU-bound tasks.
81 * When worker encounters blocking tasks, it basically hands off its permit to another thread (not directly though) to
82 * keep invariant "scheduler always has at least min(pending CPU tasks, core pool size)
83 * and at most core pool size threads to execute CPU tasks".
84 * To avoid overprovision, workers without CPU permit are allowed to scan [globalBlockingQueue]
85 * and steal **only** blocking tasks from other workers.
86 *
87 * The scheduler does not limit the count of pending blocking tasks, potentially creating up to [maxPoolSize] threads.
88 * End users do not have access to the scheduler directly and can dispatch blocking tasks only with
89 * [LimitingDispatcher] that does control concurrency level by its own mechanism.
90 */
91 @Suppress("NOTHING_TO_INLINE")
92 internal class CoroutineScheduler(
93 @JvmField val corePoolSize: Int,
94 @JvmField val maxPoolSize: Int,
95 @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
96 @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
97 ) : Executor, Closeable {
98 init {
99 require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
100 "Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
101 }
102 require(maxPoolSize >= corePoolSize) {
103 "Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
104 }
105 require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
106 "Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
107 }
108 require(idleWorkerKeepAliveNs > 0) {
109 "Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
110 }
111 }
112
113 @JvmField
114 val globalCpuQueue = GlobalQueue()
115 @JvmField
116 val globalBlockingQueue = GlobalQueue()
117
118 private fun addToGlobalQueue(task: Task): Boolean {
119 return if (task.isBlocking) {
120 globalBlockingQueue.addLast(task)
121 } else {
122 globalCpuQueue.addLast(task)
123 }
124 }
125
126 /**
127 * The stack of parker workers.
128 * Every worker registers itself in a stack before parking (if it was not previously registered),
129 * so it can be signalled when new tasks arrive.
130 * This is a form of intrusive garbage-free Treiber stack where [Worker] also is a stack node.
131 *
132 * The stack is better than a queue (even with the contention on top) because it unparks threads
133 * in most-recently used order, improving both performance and locality.
134 * Moreover, it decreases threads thrashing, if the pool has n threads when only n / 2 is required,
135 * the latter half will never be unparked and will terminate itself after [IDLE_WORKER_KEEP_ALIVE_NS].
136 *
137 * This long version consist of version bits with [PARKED_VERSION_MASK]
138 * and top worker thread index bits with [PARKED_INDEX_MASK].
139 */
140 private val parkedWorkersStack = atomic(0L)
141
142 /**
143 * Updates index of the worker at the top of [parkedWorkersStack].
144 * It always updates version to ensure interference with [parkedWorkersStackPop] operation
145 * that might have already decided to put this index to the top.
146 *
147 * Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]).
148 */
149 fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) {
150 parkedWorkersStack.loop { top ->
151 val index = (top and PARKED_INDEX_MASK).toInt()
152 val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
153 val updIndex = if (index == oldIndex) {
154 if (newIndex == 0) {
155 parkedWorkersStackNextIndex(worker)
156 } else {
157 newIndex
158 }
159 } else {
160 index // no change to index, but update version
161 }
162 if (updIndex < 0) return@loop // retry
163 if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return
164 }
165 }
166
167 /**
168 * Pushes worker into [parkedWorkersStack].
169 * It does nothing is this worker is already physically linked to the stack.
170 * This method is invoked only from the worker thread itself.
171 * This invocation always precedes [LockSupport.parkNanos].
172 * See [Worker.tryPark].
173 *
174 * Returns `true` if worker was added to the stack by this invocation, `false` if it was already
175 * registered in the stack.
176 */
177 fun parkedWorkersStackPush(worker: Worker): Boolean {
178 if (worker.nextParkedWorker !== NOT_IN_STACK) return false // already in stack, bail out
179 /*
180 * The below loop can be entered only if this worker was not in the stack and, since no other thread
181 * can add it to the stack (only the worker itself), this invariant holds while this loop executes.
182 */
183 parkedWorkersStack.loop { top ->
184 val index = (top and PARKED_INDEX_MASK).toInt()
185 val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
186 val updIndex = worker.indexInArray
187 assert { updIndex != 0 } // only this worker can push itself, cannot be terminated
188 worker.nextParkedWorker = workers[index]
189 /*
190 * Other thread can be changing this worker's index at this point, but it
191 * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail.
192 * Successful CAS of the stack top completes successful push.
193 */
194 if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return true
195 }
196 }
197
198 /**
199 * Pops worker from [parkedWorkersStack].
200 * It can be invoked concurrently from any thread that is looking for help and needs to unpark some worker.
201 * This invocation is always followed by an attempt to [LockSupport.unpark] resulting worker.
202 * See [tryUnpark].
203 */
204 private fun parkedWorkersStackPop(): Worker? {
205 parkedWorkersStack.loop { top ->
206 val index = (top and PARKED_INDEX_MASK).toInt()
207 val worker = workers[index] ?: return null // stack is empty
208 val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
209 val updIndex = parkedWorkersStackNextIndex(worker)
210 if (updIndex < 0) return@loop // retry
211 /*
212 * Other thread can be changing this worker's index at this point, but it
213 * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail.
214 * Successful CAS of the stack top completes successful pop.
215 */
216 if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) {
217 /*
218 * We've just took worker out of the stack, but nextParkerWorker is not reset yet, so if a worker is
219 * currently invoking parkedWorkersStackPush it would think it is in the stack and bail out without
220 * adding itself again. It does not matter, since we are going it invoke unpark on the thread
221 * that was popped out of parkedWorkersStack anyway.
222 */
223 worker.nextParkedWorker = NOT_IN_STACK
224 return worker
225 }
226 }
227 }
228
229 /**
230 * Finds next usable index for [parkedWorkersStack]. The problem is that workers can
231 * be terminated at their [Worker.indexInArray] becomes zero, so they cannot be
232 * put at the top of the stack. In which case we are looking for next.
233 *
234 * Returns `index >= 0` or `-1` for retry.
235 */
236 private fun parkedWorkersStackNextIndex(worker: Worker): Int {
237 var next = worker.nextParkedWorker
238 findNext@ while (true) {
239 when {
240 next === NOT_IN_STACK -> return -1 // we are too late -- other thread popped this element, retry
241 next === null -> return 0 // stack becomes empty
242 else -> {
243 val nextWorker = next as Worker
244 val updIndex = nextWorker.indexInArray
245 if (updIndex != 0) return updIndex // found good index for next worker
246 // Otherwise, this worker was terminated and we cannot put it to top anymore, check next
247 next = nextWorker.nextParkedWorker
248 }
249 }
250 }
251 }
252
253 /**
254 * State of worker threads.
255 * [workers] is a dynamic array of lazily created workers up to [maxPoolSize] workers.
256 * [createdWorkers] is count of already created workers (worker with index lesser than [createdWorkers] exists).
257 * [blockingTasks] is count of pending (either in the queue or being executed) blocking tasks.
258 *
259 * Workers array is also used as a lock for workers' creation and termination sequence.
260 *
261 * **NOTE**: `workers[0]` is always `null` (never used, works as sentinel value), so
262 * workers are 1-indexed, code path in [Worker.trySteal] is a bit faster and index swap during termination
263 * works properly.
264 *
265 * Initial size is `Dispatchers.Default` size * 2 to prevent unnecessary resizes for slightly or steadily loaded
266 * applications.
267 */
268 @JvmField
269 val workers = ResizableAtomicArray<Worker>((corePoolSize + 1) * 2)
270
271 /**
272 * The `Long` value describing the state of workers in this pool.
273 * Currently, includes created, CPU-acquired, and blocking workers, each occupying [BLOCKING_SHIFT] bits.
274 *
275 * State layout (highest to lowest bits):
276 * | --- number of cpu permits, 22 bits --- | --- number of blocking tasks, 21 bits --- | --- number of created threads, 21 bits --- |
277 */
278 private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
279
280 private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
281 private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
282
283 private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
284 private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
285 inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
286
287 // Guarded by synchronization
288 private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
289 private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
290
291 private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
292
293 private inline fun decrementBlockingTasks() {
294 controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
295 }
296
297 private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
298 val available = availableCpuPermits(state)
299 if (available == 0) return false
300 val update = state - (1L shl CPU_PERMITS_SHIFT)
301 if (controlState.compareAndSet(state, update)) return true
302 }
303
304 private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
305
306 // This is used a "stop signal" for close and shutdown functions
307 private val _isTerminated = atomic(false)
308 val isTerminated: Boolean get() = _isTerminated.value
309
310 companion object {
311 // A symbol to mark workers that are not in parkedWorkersStack
312 @JvmField
313 val NOT_IN_STACK = Symbol("NOT_IN_STACK")
314
315 // Worker ctl states
316 private const val PARKED = -1
317 private const val CLAIMED = 0
318 private const val TERMINATED = 1
319
320 // Masks of control state
321 private const val BLOCKING_SHIFT = 21 // 2M threads max
322 private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1
323 private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT
324 private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2
325 private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT
326
327 internal const val MIN_SUPPORTED_POOL_SIZE = 1 // we support 1 for test purposes, but it is not usually used
328 internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
329
330 // Masks of parkedWorkersStack
331 private const val PARKED_INDEX_MASK = CREATED_MASK
332 private const val PARKED_VERSION_MASK = CREATED_MASK.inv()
333 private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
334 }
335
336 override fun execute(command: Runnable) = dispatch(command)
337
338 override fun close() = shutdown(10_000L)
339
340 // Shuts down current scheduler and waits until all work is done and all threads are stopped.
341 fun shutdown(timeout: Long) {
342 // atomically set termination flag which is checked when workers are added or removed
343 if (!_isTerminated.compareAndSet(false, true)) return
344 // make sure we are not waiting for the current thread
345 val currentWorker = currentWorker()
346 // Capture # of created workers that cannot change anymore (mind the synchronized block!)
347 val created = synchronized(workers) { createdWorkers }
348 // Shutdown all workers with the only exception of the current thread
349 for (i in 1..created) {
350 val worker = workers[i]!!
351 if (worker !== currentWorker) {
352 while (worker.isAlive) {
353 LockSupport.unpark(worker)
354 worker.join(timeout)
355 }
356 val state = worker.state
357 assert { state === WorkerState.TERMINATED } // Expected TERMINATED state
358 worker.localQueue.offloadAllWorkTo(globalBlockingQueue) // Doesn't actually matter which queue to use
359 }
360 }
361 // Make sure no more work is added to GlobalQueue from anywhere
362 globalBlockingQueue.close()
363 globalCpuQueue.close()
364 // Finish processing tasks from globalQueue and/or from this worker's local queue
365 while (true) {
366 val task = currentWorker?.findTask(true)
367 ?: globalCpuQueue.removeFirstOrNull()
368 ?: globalBlockingQueue.removeFirstOrNull()
369 ?: break
370 runSafely(task)
371 }
372 // Shutdown current thread
373 currentWorker?.tryReleaseCpu(WorkerState.TERMINATED)
374 // check & cleanup state
375 assert { availableCpuPermits == corePoolSize }
376 parkedWorkersStack.value = 0L
377 controlState.value = 0L
378 }
379
380 /**
381 * Dispatches execution of a runnable [block] with a hint to a scheduler whether
382 * this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
383 *
384 * [taskContext] -- concurrency context of given [block].
385 * [tailDispatch] -- whether this [dispatch] call is the last action the (presumably) worker thread does in its current task.
386 * If `true`, then the task will be dispatched in a FIFO manner and no additional workers will be requested,
387 * but only if the current thread is a corresponding worker thread.
388 * Note that caller cannot be ensured that it is being executed on worker thread for the following reasons:
389 * * [CoroutineStart.UNDISPATCHED]
390 * * Concurrent [close] that effectively shutdowns the worker thread
391 */
392 fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
393 trackTask() // this is needed for virtual time support
394 val task = createTask(block, taskContext)
395 val isBlockingTask = task.isBlocking
396 // Invariant: we increment counter **before** publishing the task
397 // so executing thread can safely decrement the number of blocking tasks
398 val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0
399 // try to submit the task to the local queue and act depending on the result
400 val currentWorker = currentWorker()
401 val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
402 if (notAdded != null) {
403 if (!addToGlobalQueue(notAdded)) {
404 // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
405 throw RejectedExecutionException("$schedulerName was terminated")
406 }
407 }
408 val skipUnpark = tailDispatch && currentWorker != null
409 // Checking 'task' instead of 'notAdded' is completely okay
410 if (isBlockingTask) {
411 // Use state snapshot to better estimate the number of running threads
412 signalBlockingWork(stateSnapshot, skipUnpark = skipUnpark)
413 } else {
414 if (skipUnpark) return
415 signalCpuWork()
416 }
417 }
418
419 fun createTask(block: Runnable, taskContext: TaskContext): Task {
420 val nanoTime = schedulerTimeSource.nanoTime()
421 if (block is Task) {
422 block.submissionTime = nanoTime
423 block.taskContext = taskContext
424 return block
425 }
426 return TaskImpl(block, nanoTime, taskContext)
427 }
428
429 // NB: should only be called from 'dispatch' method due to blocking tasks increment
430 private fun signalBlockingWork(stateSnapshot: Long, skipUnpark: Boolean) {
431 if (skipUnpark) return
432 if (tryUnpark()) return
433 // Use state snapshot to avoid accidental thread overprovision
434 if (tryCreateWorker(stateSnapshot)) return
435 tryUnpark() // Try unpark again in case there was race between permit release and parking
436 }
437
438 fun signalCpuWork() {
439 if (tryUnpark()) return
440 if (tryCreateWorker()) return
441 tryUnpark()
442 }
443
444 private fun tryCreateWorker(state: Long = controlState.value): Boolean {
445 val created = createdWorkers(state)
446 val blocking = blockingTasks(state)
447 val cpuWorkers = (created - blocking).coerceAtLeast(0)
448 /*
449 * We check how many threads are there to handle non-blocking work,
450 * and create one more if we have not enough of them.
451 */
452 if (cpuWorkers < corePoolSize) {
453 val newCpuWorkers = createNewWorker()
454 // If we've created the first cpu worker and corePoolSize > 1 then create
455 // one more (second) cpu worker, so that stealing between them is operational
456 if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
457 if (newCpuWorkers > 0) return true
458 }
459 return false
460 }
461
462 private fun tryUnpark(): Boolean {
463 while (true) {
464 val worker = parkedWorkersStackPop() ?: return false
465 if (worker.workerCtl.compareAndSet(PARKED, CLAIMED)) {
466 LockSupport.unpark(worker)
467 return true
468 }
469 }
470 }
471
472 /**
473 * Returns the number of CPU workers after this function (including new worker) or
474 * 0 if no worker was created.
475 */
476 private fun createNewWorker(): Int {
477 val worker: Worker
478 return synchronized(workers) {
479 // Make sure we're not trying to resurrect terminated scheduler
480 if (isTerminated) return -1
481 val state = controlState.value
482 val created = createdWorkers(state)
483 val blocking = blockingTasks(state)
484 val cpuWorkers = (created - blocking).coerceAtLeast(0)
485 // Double check for overprovision
486 if (cpuWorkers >= corePoolSize) return 0
487 if (created >= maxPoolSize) return 0
488 // start & register new worker, commit index only after successful creation
489 val newIndex = createdWorkers + 1
490 require(newIndex > 0 && workers[newIndex] == null)
491 /*
492 * 1) Claim the slot (under a lock) by the newly created worker
493 * 2) Make it observable by increment created workers count
494 * 3) Only then start the worker, otherwise it may miss its own creation
495 */
496 worker = Worker(newIndex)
497 workers.setSynchronized(newIndex, worker)
498 require(newIndex == incrementCreatedWorkers())
499 cpuWorkers + 1
500 }.also { worker.start() } // Start worker when the lock is released to reduce contention, see #3652
501 }
502
503 /**
504 * Returns `null` if task was successfully added or an instance of the
505 * task that was not added or replaced (thus should be added to global queue).
506 */
507 private fun Worker?.submitToLocalQueue(task: Task, tailDispatch: Boolean): Task? {
508 if (this == null) return task
509 /*
510 * This worker could have been already terminated from this thread by close/shutdown and it should not
511 * accept any more tasks into its local queue.
512 */
513 if (state === WorkerState.TERMINATED) return task
514 // Do not add CPU tasks in local queue if we are not able to execute it
515 if (task.mode == TASK_NON_BLOCKING && state === WorkerState.BLOCKING) {
516 return task
517 }
518 mayHaveLocalTasks = true
519 return localQueue.add(task, fair = tailDispatch)
520 }
521
522 private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
523
524 /**
525 * Returns a string identifying the state of this scheduler for nicer debugging.
526 * Note that this method is not atomic and represents rough state of pool.
527 *
528 * State of the queues:
529 * b for blocking, c for CPU, r for retiring.
530 * E.g. for [1b, 1b, 2c, 1d] means that pool has
531 * two blocking workers with queue size 1, one worker with CPU permit and queue size 1
532 * and one dormant (executing his local queue before parking) worker with queue size 1.
533 */
534 override fun toString(): String {
535 var parkedWorkers = 0
536 var blockingWorkers = 0
537 var cpuWorkers = 0
538 var dormant = 0
539 var terminated = 0
540 val queueSizes = arrayListOf<String>()
541 for (index in 1 until workers.currentLength()) {
542 val worker = workers[index] ?: continue
543 val queueSize = worker.localQueue.size
544 when (worker.state) {
545 WorkerState.PARKING -> ++parkedWorkers
546 WorkerState.BLOCKING -> {
547 ++blockingWorkers
548 queueSizes += queueSize.toString() + "b" // Blocking
549 }
550 WorkerState.CPU_ACQUIRED -> {
551 ++cpuWorkers
552 queueSizes += queueSize.toString() + "c" // CPU
553 }
554 WorkerState.DORMANT -> {
555 ++dormant
556 if (queueSize > 0) queueSizes += queueSize.toString() + "d" // Retiring
557 }
558 WorkerState.TERMINATED -> ++terminated
559 }
560 }
561 val state = controlState.value
562 return "$schedulerName@$hexAddress[" +
563 "Pool Size {" +
564 "core = $corePoolSize, " +
565 "max = $maxPoolSize}, " +
566 "Worker States {" +
567 "CPU = $cpuWorkers, " +
568 "blocking = $blockingWorkers, " +
569 "parked = $parkedWorkers, " +
570 "dormant = $dormant, " +
571 "terminated = $terminated}, " +
572 "running workers queues = $queueSizes, "+
573 "global CPU queue size = ${globalCpuQueue.size}, " +
574 "global blocking queue size = ${globalBlockingQueue.size}, " +
575 "Control State {" +
576 "created workers= ${createdWorkers(state)}, " +
577 "blocking tasks = ${blockingTasks(state)}, " +
578 "CPUs acquired = ${corePoolSize - availableCpuPermits(state)}" +
579 "}]"
580 }
581
582 fun runSafely(task: Task) {
583 try {
584 task.run()
585 } catch (e: Throwable) {
586 val thread = Thread.currentThread()
587 thread.uncaughtExceptionHandler.uncaughtException(thread, e)
588 } finally {
589 unTrackTask()
590 }
591 }
592
593 internal inner class Worker private constructor() : Thread() {
594 init {
595 isDaemon = true
596 }
597
598 // guarded by scheduler lock, index in workers array, 0 when not in array (terminated)
599 @Volatile // volatile for push/pop operation into parkedWorkersStack
600 var indexInArray = 0
601 set(index) {
602 name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
603 field = index
604 }
605
606 constructor(index: Int) : this() {
607 indexInArray = index
608 }
609
610 inline val scheduler get() = this@CoroutineScheduler
611
612 @JvmField
613 val localQueue: WorkQueue = WorkQueue()
614
615 /**
616 * Slot that is used to steal tasks into to avoid re-adding them
617 * to the local queue. See [trySteal]
618 */
619 private val stolenTask: ObjectRef<Task?> = ObjectRef()
620
621 /**
622 * Worker state. **Updated only by this worker thread**.
623 * By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken.
624 * Is used locally by the worker to maintain its own invariants.
625 */
626 @JvmField
627 var state = WorkerState.DORMANT
628
629 /**
630 * Worker control state responsible for worker claiming, parking and termination.
631 * List of states:
632 * [PARKED] -- worker is parked and can self-terminate after a termination deadline.
633 * [CLAIMED] -- worker is claimed by an external submitter.
634 * [TERMINATED] -- worker is terminated and no longer usable.
635 */
636 val workerCtl = atomic(CLAIMED)
637
638 /**
639 * It is set to the termination deadline when started doing [park] and it reset
640 * when there is a task. It serves as protection against spurious wakeups of parkNanos.
641 */
642 private var terminationDeadline = 0L
643
644 /**
645 * Reference to the next worker in the [parkedWorkersStack].
646 * It may be `null` if there is no next parked worker.
647 * This reference is set to [NOT_IN_STACK] when worker is physically not in stack.
648 */
649 @Volatile
650 var nextParkedWorker: Any? = NOT_IN_STACK
651
652 /*
653 * The delay until at least one task in other worker queues will become stealable.
654 */
655 private var minDelayUntilStealableTaskNs = 0L
656
657 private var rngState = Random.nextInt()
658
659 /**
660 * Tries to acquire CPU token if worker doesn't have one
661 * @return whether worker acquired (or already had) CPU token
662 */
663 private fun tryAcquireCpuPermit(): Boolean = when {
664 state == WorkerState.CPU_ACQUIRED -> true
665 this@CoroutineScheduler.tryAcquireCpuPermit() -> {
666 state = WorkerState.CPU_ACQUIRED
667 true
668 }
669 else -> false
670 }
671
672 /**
673 * Releases CPU token if worker has any and changes state to [newState].
674 * Returns `true` if CPU permit was returned to the pool
675 */
676 fun tryReleaseCpu(newState: WorkerState): Boolean {
677 val previousState = state
678 val hadCpu = previousState == WorkerState.CPU_ACQUIRED
679 if (hadCpu) releaseCpuPermit()
680 if (previousState != newState) state = newState
681 return hadCpu
682 }
683
684 override fun run() = runWorker()
685
686 @JvmField
687 var mayHaveLocalTasks = false
688
689 private fun runWorker() {
690 var rescanned = false
691 while (!isTerminated && state != WorkerState.TERMINATED) {
692 val task = findTask(mayHaveLocalTasks)
693 // Task found. Execute and repeat
694 if (task != null) {
695 rescanned = false
696 minDelayUntilStealableTaskNs = 0L
697 executeTask(task)
698 continue
699 } else {
700 mayHaveLocalTasks = false
701 }
702 /*
703 * No tasks were found:
704 * 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline.
705 * Then its deadline is stored in [minDelayUntilStealableTask]
706 * // '2)' can be found below
707 *
708 * Then just park for that duration (ditto re-scanning).
709 * While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
710 * excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve
711 * it with "spinning via scans" mechanism.
712 * NB: this short potential parking does not interfere with `tryUnpark`
713 */
714 if (minDelayUntilStealableTaskNs != 0L) {
715 if (!rescanned) {
716 rescanned = true
717 } else {
718 rescanned = false
719 tryReleaseCpu(WorkerState.PARKING)
720 interrupted()
721 LockSupport.parkNanos(minDelayUntilStealableTaskNs)
722 minDelayUntilStealableTaskNs = 0L
723 }
724 continue
725 }
726 /*
727 * 2) Or no tasks available, time to park and, potentially, shut down the thread.
728 * Add itself to the stack of parked workers, re-scans all the queues
729 * to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks.
730 */
731 tryPark()
732 }
733 tryReleaseCpu(WorkerState.TERMINATED)
734 }
735
736 /**
737 * See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details.
738 * This is a fine-tailored method for a specific use-case not expected to be used widely.
739 */
740 fun runSingleTask(): Long {
741 val stateSnapshot = state
742 val isCpuThread = state == WorkerState.CPU_ACQUIRED
743 val task = if (isCpuThread) {
744 findCpuTask()
745 } else {
746 findBlockingTask()
747 }
748 if (task == null) {
749 if (minDelayUntilStealableTaskNs == 0L) return -1L
750 return minDelayUntilStealableTaskNs
751 }
752 runSafely(task)
753 if (!isCpuThread) decrementBlockingTasks()
754 assert { state == stateSnapshot}
755 return 0L
756 }
757
758 fun isIo() = state == WorkerState.BLOCKING
759
760 // Counterpart to "tryUnpark"
761 private fun tryPark() {
762 if (!inStack()) {
763 parkedWorkersStackPush(this)
764 return
765 }
766 workerCtl.value = PARKED // Update value once
767 /*
768 * inStack() prevents spurious wakeups, while workerCtl.value == PARKED
769 * prevents the following race:
770 *
771 * - T2 scans the queue, adds itself to the stack, goes to rescan
772 * - T2 suspends in 'workerCtl.value = PARKED' line
773 * - T1 pops T2 from the stack, claims workerCtl, suspends
774 * - T2 fails 'while (inStack())' check, goes to full rescan
775 * - T2 adds itself to the stack, parks
776 * - T1 unparks T2, bails out with success
777 * - T2 unparks and loops in 'while (inStack())'
778 */
779 while (inStack() && workerCtl.value == PARKED) { // Prevent spurious wakeups
780 if (isTerminated || state == WorkerState.TERMINATED) break
781 tryReleaseCpu(WorkerState.PARKING)
782 interrupted() // Cleanup interruptions
783 park()
784 }
785 }
786
787 private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK
788
789 private fun executeTask(task: Task) {
790 val taskMode = task.mode
791 idleReset(taskMode)
792 beforeTask(taskMode)
793 runSafely(task)
794 afterTask(taskMode)
795 }
796
797 private fun beforeTask(taskMode: Int) {
798 if (taskMode == TASK_NON_BLOCKING) return
799 // Always notify about new work when releasing CPU-permit to execute some blocking task
800 if (tryReleaseCpu(WorkerState.BLOCKING)) {
801 signalCpuWork()
802 }
803 }
804
805 private fun afterTask(taskMode: Int) {
806 if (taskMode == TASK_NON_BLOCKING) return
807 decrementBlockingTasks()
808 val currentState = state
809 // Shutdown sequence of blocking dispatcher
810 if (currentState !== WorkerState.TERMINATED) {
811 assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
812 state = WorkerState.DORMANT
813 }
814 }
815
816 /*
817 * Marsaglia xorshift RNG with period 2^32-1 for work stealing purposes.
818 * ThreadLocalRandom cannot be used to support Android and ThreadLocal<Random> is up to 15% slower on Ktor benchmarks
819 */
820 fun nextInt(upperBound: Int): Int {
821 var r = rngState
822 r = r xor (r shl 13)
823 r = r xor (r shr 17)
824 r = r xor (r shl 5)
825 rngState = r
826 val mask = upperBound - 1
827 // Fast path for power of two bound
828 if (mask and upperBound == 0) {
829 return r and mask
830 }
831 return (r and Int.MAX_VALUE) % upperBound
832 }
833
834 private fun park() {
835 // set termination deadline the first time we are here (it is reset in idleReset)
836 if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs
837 // actually park
838 LockSupport.parkNanos(idleWorkerKeepAliveNs)
839 // try terminate when we are idle past termination deadline
840 // note that comparison is written like this to protect against potential nanoTime wraparound
841 if (System.nanoTime() - terminationDeadline >= 0) {
842 terminationDeadline = 0L // if attempt to terminate worker fails we'd extend deadline again
843 tryTerminateWorker()
844 }
845 }
846
847 /**
848 * Stops execution of current thread and removes it from [createdWorkers].
849 */
850 private fun tryTerminateWorker() {
851 synchronized(workers) {
852 // Make sure we're not trying race with termination of scheduler
853 if (isTerminated) return
854 // Someone else terminated, bail out
855 if (createdWorkers <= corePoolSize) return
856 /*
857 * See tryUnpark for state reasoning.
858 * If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
859 */
860 if (!workerCtl.compareAndSet(PARKED, TERMINATED)) return
861 /*
862 * At this point this thread is no longer considered as usable for scheduling.
863 * We need multi-step choreography to reindex workers.
864 *
865 * 1) Read current worker's index and reset it to zero.
866 */
867 val oldIndex = indexInArray
868 indexInArray = 0
869 /*
870 * Now this worker cannot become the top of parkedWorkersStack, but it can
871 * still be at the stack top via oldIndex.
872 *
873 * 2) Update top of stack if it was pointing to oldIndex and make sure no
874 * pending push/pop operation that might have already retrieved oldIndex could complete.
875 */
876 parkedWorkersStackTopUpdate(this, oldIndex, 0)
877 /*
878 * 3) Move last worker into an index in array that was previously occupied by this worker,
879 * if last worker was a different one (sic!).
880 */
881 val lastIndex = decrementCreatedWorkers()
882 if (lastIndex != oldIndex) {
883 val lastWorker = workers[lastIndex]!!
884 workers.setSynchronized(oldIndex, lastWorker)
885 lastWorker.indexInArray = oldIndex
886 /*
887 * Now lastWorker is available at both indices in the array, but it can
888 * still be at the stack top on via its lastIndex
889 *
890 * 4) Update top of stack lastIndex -> oldIndex and make sure no
891 * pending push/pop operation that might have already retrieved lastIndex could complete.
892 */
893 parkedWorkersStackTopUpdate(lastWorker, lastIndex, oldIndex)
894 }
895 /*
896 * 5) It is safe to clear reference from workers array now.
897 */
898 workers.setSynchronized(lastIndex, null)
899 }
900 state = WorkerState.TERMINATED
901 }
902
903 // It is invoked by this worker when it finds a task
904 private fun idleReset(mode: Int) {
905 terminationDeadline = 0L // reset deadline for termination
906 if (state == WorkerState.PARKING) {
907 assert { mode == TASK_PROBABLY_BLOCKING }
908 state = WorkerState.BLOCKING
909 }
910 }
911
912 fun findTask(mayHaveLocalTasks: Boolean): Task? {
913 if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks)
914 /*
915 * If we can't acquire a CPU permit, attempt to find blocking task:
916 * * Check if our queue has one (maybe mixed in with CPU tasks)
917 * * Poll global and try steal
918 */
919 return findBlockingTask()
920 }
921
922 // NB: ONLY for runSingleTask method
923 private fun findBlockingTask(): Task? {
924 return localQueue.pollBlocking()
925 ?: globalBlockingQueue.removeFirstOrNull()
926 ?: trySteal(STEAL_BLOCKING_ONLY)
927 }
928
929 // NB: ONLY for runSingleTask method
930 private fun findCpuTask(): Task? {
931 return localQueue.pollCpu()
932 ?: globalBlockingQueue.removeFirstOrNull()
933 ?: trySteal(STEAL_CPU_ONLY)
934 }
935
936 private fun findAnyTask(scanLocalQueue: Boolean): Task? {
937 /*
938 * Anti-starvation mechanism: probabilistically poll either local
939 * or global queue to ensure progress for both external and internal tasks.
940 */
941 if (scanLocalQueue) {
942 val globalFirst = nextInt(2 * corePoolSize) == 0
943 if (globalFirst) pollGlobalQueues()?.let { return it }
944 localQueue.poll()?.let { return it }
945 if (!globalFirst) pollGlobalQueues()?.let { return it }
946 } else {
947 pollGlobalQueues()?.let { return it }
948 }
949 return trySteal(STEAL_ANY)
950 }
951
952 private fun pollGlobalQueues(): Task? {
953 if (nextInt(2) == 0) {
954 globalCpuQueue.removeFirstOrNull()?.let { return it }
955 return globalBlockingQueue.removeFirstOrNull()
956 } else {
957 globalBlockingQueue.removeFirstOrNull()?.let { return it }
958 return globalCpuQueue.removeFirstOrNull()
959 }
960 }
961
962 private fun trySteal(stealingMode: StealingMode): Task? {
963 val created = createdWorkers
964 // 0 to await an initialization and 1 to avoid excess stealing on single-core machines
965 if (created < 2) {
966 return null
967 }
968
969 var currentIndex = nextInt(created)
970 var minDelay = Long.MAX_VALUE
971 repeat(created) {
972 ++currentIndex
973 if (currentIndex > created) currentIndex = 1
974 val worker = workers[currentIndex]
975 if (worker !== null && worker !== this) {
976 val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask)
977 if (stealResult == TASK_STOLEN) {
978 val result = stolenTask.element
979 stolenTask.element = null
980 return result
981 } else if (stealResult > 0) {
982 minDelay = min(minDelay, stealResult)
983 }
984 }
985 }
986 minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
987 return null
988 }
989 }
990
991 enum class WorkerState {
992 /**
993 * Has CPU token and either executes [TASK_NON_BLOCKING] task or tries to find one.
994 */
995 CPU_ACQUIRED,
996
997 /**
998 * Executing task with [TASK_PROBABLY_BLOCKING].
999 */
1000 BLOCKING,
1001
1002 /**
1003 * Currently parked.
1004 */
1005 PARKING,
1006
1007 /**
1008 * Tries to execute its local work and then goes to infinite sleep as no longer needed worker.
1009 */
1010 DORMANT,
1011
1012 /**
1013 * Terminal state, will no longer be used
1014 */
1015 TERMINATED
1016 }
1017 }
1018
1019 /**
1020 * Checks if the thread is part of a thread pool that supports coroutines.
1021 * This function is needed for integration with BlockHound.
1022 */
1023 @JvmName("isSchedulerWorker")
isSchedulerWorkernull1024 internal fun isSchedulerWorker(thread: Thread) = thread is CoroutineScheduler.Worker
1025
1026 /**
1027 * Checks if the thread is running a CPU-bound task.
1028 * This function is needed for integration with BlockHound.
1029 */
1030 @JvmName("mayNotBlock")
1031 internal fun mayNotBlock(thread: Thread) = thread is CoroutineScheduler.Worker &&
1032 thread.state == CoroutineScheduler.WorkerState.CPU_ACQUIRED
1033