<lambda>null1 package kotlinx.coroutines.scheduling
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlinx.coroutines.internal.*
6 import java.io.*
7 import java.util.concurrent.*
8 import java.util.concurrent.locks.*
9 import kotlin.jvm.internal.Ref.ObjectRef
10 import kotlin.math.*
11
12 /**
13 * Coroutine scheduler (pool of shared threads) with a primary target to distribute dispatched coroutines
14 * over worker threads, including both CPU-intensive and potentially blocking tasks, in the most efficient manner.
15 *
16 * The current scheduler implementation has two optimization targets:
17 * - Efficiency in the face of communication patterns (e.g. actors communicating via channel).
18 * - Dynamic thread state and resizing to schedule blocking calls without re-dispatching coroutine to a separate "blocking" thread pool.
19 *
20 * ### Structural overview
21 *
22 * The scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to
23 * [maxPoolSize] lazily created threads to execute blocking tasks.
24 * The scheduler has two global queues -- one for CPU tasks and one for blocking tasks.
25 * These queues are used for tasks that a submited externally (from threads not belonging to the scheduler)
26 * and as overflow buffers for thread-local queues.
27 *
28 * Every worker has a local queue in addition to global scheduler queues.
29 * The queue to pick the task from is selected randomly to avoid starvation of both local queue and
30 * global queue submitted tasks.
31 * Work-stealing is implemented on top of that queues to provide even load distribution and an illusion of centralized run queue.
32 *
33 * ### Scheduling policy
34 *
35 * When a coroutine is dispatched from within a scheduler worker, it's placed into the head of worker run queue.
36 * If the head is not empty, the task from the head is moved to the tail. Though it is an unfair scheduling policy,
37 * it effectively couples communicating coroutines into one and eliminates scheduling latency
38 * that arises from placing tasks to the end of the queue.
39 * Placing former head to the tail is necessary to provide semi-FIFO order, otherwise, queue degenerates to a stack.
40 * When a coroutine is dispatched from an external thread, it's put into the global queue.
41 * The original idea with a single-slot LIFO buffer comes from Golang runtime scheduler by D. Vyukov.
42 * It was proven to be "fair enough", performant and generally well accepted and initially was a significant inspiration
43 * source for the coroutine scheduler.
44 *
45 * ### Work stealing and affinity
46 *
47 * To provide even tasks distribution worker tries to steal tasks from other workers queues
48 * before parking when his local queue is empty.
49 * A non-standard solution is implemented to provide tasks affinity: a task from FIFO buffer may be stolen
50 * only if it is stale enough based on the value of [WORK_STEALING_TIME_RESOLUTION_NS].
51 * For this purpose, monotonic global clock is used, and every task has a submission time associated with task.
52 * This approach shows outstanding results when coroutines are cooperative,
53 * but as a downside, the scheduler now depends on a high-resolution global clock,
54 * which may limit scalability on NUMA machines.
55 *
56 * ### Thread management
57 *
58 * One of the hardest parts of the scheduler is decentralized management of the threads with progress guarantees
59 * similar to the regular centralized executors.
60 * The state of the threads consists of [controlState] and [parkedWorkersStack] fields.
61 * The former field incorporates the number of created threads, CPU-tokens and blocking tasks
62 * that require thread compensation,
63 * while the latter represents an intrusive versioned Treiber stack of idle workers.
64 * When a worker cannot find any work, it first adds itself to the stack,
65 * then re-scans the queue to avoid missing signals and then attempts to park
66 * with an additional rendezvous against unnecessary parking.
67 * If a worker finds a task that it cannot yet steal due to time constraints, it stores this fact in its state
68 * (to be uncounted when additional work is signalled) and parks for such duration.
69 *
70 * When a new task arrives to the scheduler (whether it is a local or a global queue),
71 * either an idle worker is being signalled, or a new worker is attempted to be created.
72 * (Only [corePoolSize] workers can be created for regular CPU tasks)
73 *
74 * ### Support for blocking tasks
75 *
76 * The scheduler also supports the notion of [blocking][Task.isBlocking] tasks.
77 * When executing or enqueuing blocking tasks, the scheduler notifies or creates an additional worker in
78 * addition to the core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
79 * available to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
80 * "CPU permits" -- #[corePoolSize] special tokens that allow an arbitrary worker to execute and steal CPU-bound tasks.
81 * When a worker encounters a blocking tasks, it releases its permit to the scheduler to
82 * keep an invariant "scheduler always has at least min(pending CPU tasks, core pool size)
83 * and at most core pool size threads to execute CPU tasks".
84 * To avoid overprovision, workers without CPU permit are allowed to scan [globalBlockingQueue]
85 * and steal **only** blocking tasks from other workers which imposes a non-trivial complexity to the queue management.
86 *
87 * The scheduler does not limit the count of pending blocking tasks, potentially creating up to [maxPoolSize] threads.
88 * End users do not have access to the scheduler directly and can dispatch blocking tasks only with
89 * [LimitingDispatcher] that does control concurrency level by its own mechanism.
90 */
91 @Suppress("NOTHING_TO_INLINE")
92 internal class CoroutineScheduler(
93 @JvmField val corePoolSize: Int,
94 @JvmField val maxPoolSize: Int,
95 @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
96 @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
97 ) : Executor, Closeable {
98 init {
99 require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
100 "Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
101 }
102 require(maxPoolSize >= corePoolSize) {
103 "Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
104 }
105 require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
106 "Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
107 }
108 require(idleWorkerKeepAliveNs > 0) {
109 "Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
110 }
111 }
112
113 @JvmField
114 val globalCpuQueue = GlobalQueue()
115
116 @JvmField
117 val globalBlockingQueue = GlobalQueue()
118
119 private fun addToGlobalQueue(task: Task): Boolean {
120 return if (task.isBlocking) {
121 globalBlockingQueue.addLast(task)
122 } else {
123 globalCpuQueue.addLast(task)
124 }
125 }
126
127 /**
128 * The stack of parker workers.
129 * Every worker registers itself in a stack before parking (if it was not previously registered),
130 * so it can be signalled when new tasks arrive.
131 * This is a form of intrusive garbage-free Treiber stack where [Worker] also is a stack node.
132 *
133 * The stack is better than a queue (even with the contention on top) because it unparks threads
134 * in most-recently used order, improving both performance and locality.
135 * Moreover, it decreases threads thrashing, if the pool has n threads when only n / 2 is required,
136 * the latter half will never be unparked and will terminate itself after [IDLE_WORKER_KEEP_ALIVE_NS].
137 *
138 * This long version consist of version bits with [PARKED_VERSION_MASK]
139 * and top worker thread index bits with [PARKED_INDEX_MASK].
140 */
141 private val parkedWorkersStack = atomic(0L)
142
143 /**
144 * Updates index of the worker at the top of [parkedWorkersStack].
145 * It always updates version to ensure interference with [parkedWorkersStackPop] operation
146 * that might have already decided to put this index to the top.
147 *
148 * Note, [newIndex] can be zero for the worker that is being terminated (removed from [workers]).
149 */
150 fun parkedWorkersStackTopUpdate(worker: Worker, oldIndex: Int, newIndex: Int) {
151 parkedWorkersStack.loop { top ->
152 val index = (top and PARKED_INDEX_MASK).toInt()
153 val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
154 val updIndex = if (index == oldIndex) {
155 if (newIndex == 0) {
156 parkedWorkersStackNextIndex(worker)
157 } else {
158 newIndex
159 }
160 } else {
161 index // no change to index, but update version
162 }
163 if (updIndex < 0) return@loop // retry
164 if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return
165 }
166 }
167
168 /**
169 * Pushes worker into [parkedWorkersStack].
170 * It does nothing is this worker is already physically linked to the stack.
171 * This method is invoked only from the worker thread itself.
172 * This invocation always precedes [LockSupport.parkNanos].
173 * See [Worker.tryPark].
174 *
175 * Returns `true` if worker was added to the stack by this invocation, `false` if it was already
176 * registered in the stack.
177 */
178 fun parkedWorkersStackPush(worker: Worker): Boolean {
179 if (worker.nextParkedWorker !== NOT_IN_STACK) return false // already in stack, bail out
180 /*
181 * The below loop can be entered only if this worker was not in the stack and, since no other thread
182 * can add it to the stack (only the worker itself), this invariant holds while this loop executes.
183 */
184 parkedWorkersStack.loop { top ->
185 val index = (top and PARKED_INDEX_MASK).toInt()
186 val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
187 val updIndex = worker.indexInArray
188 assert { updIndex != 0 } // only this worker can push itself, cannot be terminated
189 worker.nextParkedWorker = workers[index]
190 /*
191 * Other thread can be changing this worker's index at this point, but it
192 * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail.
193 * Successful CAS of the stack top completes successful push.
194 */
195 if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) return true
196 }
197 }
198
199 /**
200 * Pops worker from [parkedWorkersStack].
201 * It can be invoked concurrently from any thread that is looking for help and needs to unpark some worker.
202 * This invocation is always followed by an attempt to [LockSupport.unpark] resulting worker.
203 * See [tryUnpark].
204 */
205 private fun parkedWorkersStackPop(): Worker? {
206 parkedWorkersStack.loop { top ->
207 val index = (top and PARKED_INDEX_MASK).toInt()
208 val worker = workers[index] ?: return null // stack is empty
209 val updVersion = (top + PARKED_VERSION_INC) and PARKED_VERSION_MASK
210 val updIndex = parkedWorkersStackNextIndex(worker)
211 if (updIndex < 0) return@loop // retry
212 /*
213 * Other thread can be changing this worker's index at this point, but it
214 * also invokes parkedWorkersStackTopUpdate which updates version to make next CAS fail.
215 * Successful CAS of the stack top completes successful pop.
216 */
217 if (parkedWorkersStack.compareAndSet(top, updVersion or updIndex.toLong())) {
218 /*
219 * We've just took worker out of the stack, but nextParkerWorker is not reset yet, so if a worker is
220 * currently invoking parkedWorkersStackPush it would think it is in the stack and bail out without
221 * adding itself again. It does not matter, since we are going it invoke unpark on the thread
222 * that was popped out of parkedWorkersStack anyway.
223 */
224 worker.nextParkedWorker = NOT_IN_STACK
225 return worker
226 }
227 }
228 }
229
230 /**
231 * Finds next usable index for [parkedWorkersStack]. The problem is that workers can
232 * be terminated at their [Worker.indexInArray] becomes zero, so they cannot be
233 * put at the top of the stack. In which case we are looking for next.
234 *
235 * Returns `index >= 0` or `-1` for retry.
236 */
237 private fun parkedWorkersStackNextIndex(worker: Worker): Int {
238 var next = worker.nextParkedWorker
239 findNext@ while (true) {
240 when {
241 next === NOT_IN_STACK -> return -1 // we are too late -- other thread popped this element, retry
242 next === null -> return 0 // stack becomes empty
243 else -> {
244 val nextWorker = next as Worker
245 val updIndex = nextWorker.indexInArray
246 if (updIndex != 0) return updIndex // found good index for next worker
247 // Otherwise, this worker was terminated and we cannot put it to top anymore, check next
248 next = nextWorker.nextParkedWorker
249 }
250 }
251 }
252 }
253
254 /**
255 * State of worker threads.
256 * [workers] is a dynamic array of lazily created workers up to [maxPoolSize] workers.
257 * [createdWorkers] is count of already created workers (worker with index lesser than [createdWorkers] exists).
258 * [blockingTasks] is count of pending (either in the queue or being executed) blocking tasks.
259 *
260 * Workers array is also used as a lock for workers' creation and termination sequence.
261 *
262 * **NOTE**: `workers[0]` is always `null` (never used, works as sentinel value), so
263 * workers are 1-indexed, code path in [Worker.trySteal] is a bit faster and index swap during termination
264 * works properly.
265 *
266 * Initial size is `Dispatchers.Default` size * 2 to prevent unnecessary resizes for slightly or steadily loaded
267 * applications.
268 */
269 @JvmField
270 val workers = ResizableAtomicArray<Worker>((corePoolSize + 1) * 2)
271
272 /**
273 * The `Long` value describing the state of workers in this pool.
274 * Currently, includes created, CPU-acquired, and blocking workers, each occupying [BLOCKING_SHIFT] bits.
275 *
276 * State layout (highest to lowest bits):
277 * | --- number of cpu permits, 22 bits --- | --- number of blocking tasks, 21 bits --- | --- number of created threads, 21 bits --- |
278 */
279 private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
280
281 private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
282 private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)
283
284 private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
285 private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
286 inline fun availableCpuPermits(state: Long): Int = (state and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
287
288 // Guarded by synchronization
289 private inline fun incrementCreatedWorkers(): Int = createdWorkers(controlState.incrementAndGet())
290 private inline fun decrementCreatedWorkers(): Int = createdWorkers(controlState.getAndDecrement())
291
292 private inline fun incrementBlockingTasks() = controlState.addAndGet(1L shl BLOCKING_SHIFT)
293
294 private inline fun decrementBlockingTasks() {
295 controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
296 }
297
298 private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
299 val available = availableCpuPermits(state)
300 if (available == 0) return false
301 val update = state - (1L shl CPU_PERMITS_SHIFT)
302 if (controlState.compareAndSet(state, update)) return true
303 }
304
305 private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
306
307 // This is used a "stop signal" for close and shutdown functions
308 private val _isTerminated = atomic(false)
309 val isTerminated: Boolean get() = _isTerminated.value
310
311 companion object {
312 // A symbol to mark workers that are not in parkedWorkersStack
313 @JvmField
314 val NOT_IN_STACK = Symbol("NOT_IN_STACK")
315
316 // Worker ctl states
317 private const val PARKED = -1
318 private const val CLAIMED = 0
319 private const val TERMINATED = 1
320
321 // Masks of control state
322 private const val BLOCKING_SHIFT = 21 // 2M threads max
323 private const val CREATED_MASK: Long = (1L shl BLOCKING_SHIFT) - 1
324 private const val BLOCKING_MASK: Long = CREATED_MASK shl BLOCKING_SHIFT
325 private const val CPU_PERMITS_SHIFT = BLOCKING_SHIFT * 2
326 private const val CPU_PERMITS_MASK = CREATED_MASK shl CPU_PERMITS_SHIFT
327
328 internal const val MIN_SUPPORTED_POOL_SIZE = 1 // we support 1 for test purposes, but it is not usually used
329 internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
330
331 // Masks of parkedWorkersStack
332 private const val PARKED_INDEX_MASK = CREATED_MASK
333 private const val PARKED_VERSION_MASK = CREATED_MASK.inv()
334 private const val PARKED_VERSION_INC = 1L shl BLOCKING_SHIFT
335 }
336
337 override fun execute(command: Runnable) = dispatch(command)
338
339 override fun close() = shutdown(10_000L)
340
341 // Shuts down current scheduler and waits until all work is done and all threads are stopped.
342 fun shutdown(timeout: Long) {
343 // atomically set termination flag which is checked when workers are added or removed
344 if (!_isTerminated.compareAndSet(false, true)) return
345 // make sure we are not waiting for the current thread
346 val currentWorker = currentWorker()
347 // Capture # of created workers that cannot change anymore (mind the synchronized block!)
348 val created = synchronized(workers) { createdWorkers }
349 // Shutdown all workers with the only exception of the current thread
350 for (i in 1..created) {
351 val worker = workers[i]!!
352 if (worker !== currentWorker) {
353 // Note: this is java.lang.Thread.getState() of type java.lang.Thread.State
354 while (worker.getState() != Thread.State.TERMINATED) {
355 LockSupport.unpark(worker)
356 worker.join(timeout)
357 }
358 // Note: this is CoroutineScheduler.Worker.state of type CoroutineScheduler.WorkerState
359 assert { worker.state === WorkerState.TERMINATED } // Expected TERMINATED state
360 worker.localQueue.offloadAllWorkTo(globalBlockingQueue) // Doesn't actually matter which queue to use
361 }
362 }
363 // Make sure no more work is added to GlobalQueue from anywhere
364 globalBlockingQueue.close()
365 globalCpuQueue.close()
366 // Finish processing tasks from globalQueue and/or from this worker's local queue
367 while (true) {
368 val task = currentWorker?.findTask(true)
369 ?: globalCpuQueue.removeFirstOrNull()
370 ?: globalBlockingQueue.removeFirstOrNull()
371 ?: break
372 runSafely(task)
373 }
374 // Shutdown current thread
375 currentWorker?.tryReleaseCpu(WorkerState.TERMINATED)
376 // check & cleanup state
377 assert { availableCpuPermits == corePoolSize }
378 parkedWorkersStack.value = 0L
379 controlState.value = 0L
380 }
381
382 /**
383 * Dispatches execution of a runnable [block] with a hint to a scheduler whether
384 * this [block] may execute blocking operations (IO, system calls, locking primitives etc.)
385 *
386 * [taskContext] -- concurrency context of given [block].
387 * [fair] -- whether this [dispatch] call is fair.
388 * If `true` then the task will be dispatched in a FIFO manner.
389 * Note that caller cannot be ensured that it is being executed on worker thread for the following reasons:
390 * - [CoroutineStart.UNDISPATCHED]
391 * - Concurrent [close] that effectively shutdowns the worker thread.
392 * Used for [yield].
393 */
394 fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, fair: Boolean = false) {
395 trackTask() // this is needed for virtual time support
396 val task = createTask(block, taskContext)
397 val isBlockingTask = task.isBlocking
398 // Invariant: we increment counter **before** publishing the task
399 // so executing thread can safely decrement the number of blocking tasks
400 val stateSnapshot = if (isBlockingTask) incrementBlockingTasks() else 0
401 // try to submit the task to the local queue and act depending on the result
402 val currentWorker = currentWorker()
403 val notAdded = currentWorker.submitToLocalQueue(task, fair)
404 if (notAdded != null) {
405 if (!addToGlobalQueue(notAdded)) {
406 // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
407 throw RejectedExecutionException("$schedulerName was terminated")
408 }
409 }
410 // Checking 'task' instead of 'notAdded' is completely okay
411 if (isBlockingTask) {
412 // Use state snapshot to better estimate the number of running threads
413 signalBlockingWork(stateSnapshot)
414 } else {
415 signalCpuWork()
416 }
417 }
418
419 fun createTask(block: Runnable, taskContext: TaskContext): Task {
420 val nanoTime = schedulerTimeSource.nanoTime()
421 if (block is Task) {
422 block.submissionTime = nanoTime
423 block.taskContext = taskContext
424 return block
425 }
426 return block.asTask(nanoTime, taskContext)
427 }
428
429 // NB: should only be called from 'dispatch' method due to blocking tasks increment
430 private fun signalBlockingWork(stateSnapshot: Long) {
431 if (tryUnpark()) return
432 // Use state snapshot to avoid accidental thread overprovision
433 if (tryCreateWorker(stateSnapshot)) return
434 tryUnpark() // Try unpark again in case there was race between permit release and parking
435 }
436
437 fun signalCpuWork() {
438 if (tryUnpark()) return
439 if (tryCreateWorker()) return
440 tryUnpark()
441 }
442
443 private fun tryCreateWorker(state: Long = controlState.value): Boolean {
444 val created = createdWorkers(state)
445 val blocking = blockingTasks(state)
446 val cpuWorkers = (created - blocking).coerceAtLeast(0)
447 /*
448 * We check how many threads are there to handle non-blocking work,
449 * and create one more if we have not enough of them.
450 */
451 if (cpuWorkers < corePoolSize) {
452 val newCpuWorkers = createNewWorker()
453 // If we've created the first cpu worker and corePoolSize > 1 then create
454 // one more (second) cpu worker, so that stealing between them is operational
455 if (newCpuWorkers == 1 && corePoolSize > 1) createNewWorker()
456 if (newCpuWorkers > 0) return true
457 }
458 return false
459 }
460
461 private fun tryUnpark(): Boolean {
462 while (true) {
463 val worker = parkedWorkersStackPop() ?: return false
464 if (worker.workerCtl.compareAndSet(PARKED, CLAIMED)) {
465 LockSupport.unpark(worker)
466 return true
467 }
468 }
469 }
470
471 /**
472 * Returns the number of CPU workers after this function (including new worker) or
473 * 0 if no worker was created.
474 */
475 private fun createNewWorker(): Int {
476 val worker: Worker
477 return synchronized(workers) {
478 // Make sure we're not trying to resurrect terminated scheduler
479 if (isTerminated) return -1
480 val state = controlState.value
481 val created = createdWorkers(state)
482 val blocking = blockingTasks(state)
483 val cpuWorkers = (created - blocking).coerceAtLeast(0)
484 // Double check for overprovision
485 if (cpuWorkers >= corePoolSize) return 0
486 if (created >= maxPoolSize) return 0
487 // start & register new worker, commit index only after successful creation
488 val newIndex = createdWorkers + 1
489 require(newIndex > 0 && workers[newIndex] == null)
490 /*
491 * 1) Claim the slot (under a lock) by the newly created worker
492 * 2) Make it observable by increment created workers count
493 * 3) Only then start the worker, otherwise it may miss its own creation
494 */
495 worker = Worker(newIndex)
496 workers.setSynchronized(newIndex, worker)
497 require(newIndex == incrementCreatedWorkers())
498 cpuWorkers + 1
499 }.also { worker.start() } // Start worker when the lock is released to reduce contention, see #3652
500 }
501
502 /**
503 * Returns `null` if task was successfully added or an instance of the
504 * task that was not added or replaced (thus should be added to global queue).
505 */
506 private fun Worker?.submitToLocalQueue(task: Task, fair: Boolean): Task? {
507 if (this == null) return task
508 /*
509 * This worker could have been already terminated from this thread by close/shutdown and it should not
510 * accept any more tasks into its local queue.
511 */
512 if (state === WorkerState.TERMINATED) return task
513 // Do not add CPU tasks in local queue if we are not able to execute it
514 if (!task.isBlocking && state === WorkerState.BLOCKING) {
515 return task
516 }
517 mayHaveLocalTasks = true
518 return localQueue.add(task, fair = fair)
519 }
520
521 private fun currentWorker(): Worker? = (Thread.currentThread() as? Worker)?.takeIf { it.scheduler == this }
522
523 /**
524 * Returns a string identifying the state of this scheduler for nicer debugging.
525 * Note that this method is not atomic and represents rough state of pool.
526 *
527 * State of the queues:
528 * b for blocking, c for CPU, r for retiring.
529 * E.g. for [1b, 1b, 2c, 1d] means that pool has
530 * two blocking workers with queue size 1, one worker with CPU permit and queue size 1
531 * and one dormant (executing his local queue before parking) worker with queue size 1.
532 */
533 override fun toString(): String {
534 var parkedWorkers = 0
535 var blockingWorkers = 0
536 var cpuWorkers = 0
537 var dormant = 0
538 var terminated = 0
539 val queueSizes = arrayListOf<String>()
540 for (index in 1 until workers.currentLength()) {
541 val worker = workers[index] ?: continue
542 val queueSize = worker.localQueue.size
543 when (worker.state) {
544 WorkerState.PARKING -> ++parkedWorkers
545 WorkerState.BLOCKING -> {
546 ++blockingWorkers
547 queueSizes += queueSize.toString() + "b" // Blocking
548 }
549
550 WorkerState.CPU_ACQUIRED -> {
551 ++cpuWorkers
552 queueSizes += queueSize.toString() + "c" // CPU
553 }
554
555 WorkerState.DORMANT -> {
556 ++dormant
557 if (queueSize > 0) queueSizes += queueSize.toString() + "d" // Retiring
558 }
559
560 WorkerState.TERMINATED -> ++terminated
561 }
562 }
563 val state = controlState.value
564 return "$schedulerName@$hexAddress[" +
565 "Pool Size {" +
566 "core = $corePoolSize, " +
567 "max = $maxPoolSize}, " +
568 "Worker States {" +
569 "CPU = $cpuWorkers, " +
570 "blocking = $blockingWorkers, " +
571 "parked = $parkedWorkers, " +
572 "dormant = $dormant, " +
573 "terminated = $terminated}, " +
574 "running workers queues = $queueSizes, " +
575 "global CPU queue size = ${globalCpuQueue.size}, " +
576 "global blocking queue size = ${globalBlockingQueue.size}, " +
577 "Control State {" +
578 "created workers= ${createdWorkers(state)}, " +
579 "blocking tasks = ${blockingTasks(state)}, " +
580 "CPUs acquired = ${corePoolSize - availableCpuPermits(state)}" +
581 "}]"
582 }
583
584 fun runSafely(task: Task) {
585 try {
586 task.run()
587 } catch (e: Throwable) {
588 val thread = Thread.currentThread()
589 thread.uncaughtExceptionHandler.uncaughtException(thread, e)
590 } finally {
591 unTrackTask()
592 }
593 }
594
595 internal inner class Worker private constructor() : Thread() {
596 init {
597 isDaemon = true
598 /*
599 * `Dispatchers.Default` is used as *the* dispatcher in the containerized environments,
600 * isolated by their own classloaders. Workers are populated lazily, thus we are inheriting
601 * `Dispatchers.Default` context class loader here instead of using parent' thread one
602 * in order not to accidentally capture temporary application classloader.
603 */
604 contextClassLoader = this@CoroutineScheduler.javaClass.classLoader
605 }
606
607 // guarded by scheduler lock, index in workers array, 0 when not in array (terminated)
608 @Volatile // volatile for push/pop operation into parkedWorkersStack
609 var indexInArray = 0
610 set(index) {
611 name = "$schedulerName-worker-${if (index == 0) "TERMINATED" else index.toString()}"
612 field = index
613 }
614
615 constructor(index: Int) : this() {
616 indexInArray = index
617 }
618
619 inline val scheduler get() = this@CoroutineScheduler
620
621 @JvmField
622 val localQueue: WorkQueue = WorkQueue()
623
624 /**
625 * Slot that is used to steal tasks into to avoid re-adding them
626 * to the local queue. See [trySteal]
627 */
628 private val stolenTask: ObjectRef<Task?> = ObjectRef()
629
630 /**
631 * Worker state. **Updated only by this worker thread**.
632 * By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken.
633 * Is used locally by the worker to maintain its own invariants.
634 */
635 @JvmField
636 var state = WorkerState.DORMANT
637
638 /**
639 * Worker control state responsible for worker claiming, parking and termination.
640 * List of states:
641 * [PARKED] -- worker is parked and can self-terminate after a termination deadline.
642 * [CLAIMED] -- worker is claimed by an external submitter.
643 * [TERMINATED] -- worker is terminated and no longer usable.
644 */
645 val workerCtl = atomic(CLAIMED)
646
647 /**
648 * It is set to the termination deadline when started doing [park] and it reset
649 * when there is a task. It serves as protection against spurious wakeups of parkNanos.
650 */
651 private var terminationDeadline = 0L
652
653 /**
654 * Reference to the next worker in the [parkedWorkersStack].
655 * It may be `null` if there is no next parked worker.
656 * This reference is set to [NOT_IN_STACK] when worker is physically not in stack.
657 */
658 @Volatile
659 var nextParkedWorker: Any? = NOT_IN_STACK
660
661 /*
662 * The delay until at least one task in other worker queues will become stealable.
663 */
664 private var minDelayUntilStealableTaskNs = 0L
665
666 /**
667 * The state of embedded Marsaglia xorshift random number generator, used for work-stealing purposes.
668 * It is initialized with a seed.
669 */
670 private var rngState: Int = run {
671 // This could've been Random.nextInt(), but we are shaving an extra initialization cost, see #4051
672 val seed = System.nanoTime().toInt()
673 // rngState shouldn't be zero, as required for the xorshift algorithm
674 if (seed != 0) return@run seed
675 42
676 }
677
678 /**
679 * Tries to acquire CPU token if worker doesn't have one
680 * @return whether worker acquired (or already had) CPU token
681 */
682 private fun tryAcquireCpuPermit(): Boolean = when {
683 state == WorkerState.CPU_ACQUIRED -> true
684 this@CoroutineScheduler.tryAcquireCpuPermit() -> {
685 state = WorkerState.CPU_ACQUIRED
686 true
687 }
688
689 else -> false
690 }
691
692 /**
693 * Releases CPU token if worker has any and changes state to [newState].
694 * Returns `true` if CPU permit was returned to the pool
695 */
696 fun tryReleaseCpu(newState: WorkerState): Boolean {
697 val previousState = state
698 val hadCpu = previousState == WorkerState.CPU_ACQUIRED
699 if (hadCpu) releaseCpuPermit()
700 if (previousState != newState) state = newState
701 return hadCpu
702 }
703
704 override fun run() = runWorker()
705
706 @JvmField
707 var mayHaveLocalTasks = false
708
709 private fun runWorker() {
710 var rescanned = false
711 while (!isTerminated && state != WorkerState.TERMINATED) {
712 val task = findTask(mayHaveLocalTasks)
713 // Task found. Execute and repeat
714 if (task != null) {
715 rescanned = false
716 minDelayUntilStealableTaskNs = 0L
717 executeTask(task)
718 continue
719 } else {
720 mayHaveLocalTasks = false
721 }
722 /*
723 * No tasks were found:
724 * 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline.
725 * Then its deadline is stored in [minDelayUntilStealableTask]
726 * // '2)' can be found below
727 *
728 * Then just park for that duration (ditto re-scanning).
729 * While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
730 * excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve
731 * it with "spinning via scans" mechanism.
732 * NB: this short potential parking does not interfere with `tryUnpark`
733 */
734 if (minDelayUntilStealableTaskNs != 0L) {
735 if (!rescanned) {
736 rescanned = true
737 } else {
738 rescanned = false
739 tryReleaseCpu(WorkerState.PARKING)
740 interrupted()
741 LockSupport.parkNanos(minDelayUntilStealableTaskNs)
742 minDelayUntilStealableTaskNs = 0L
743 }
744 continue
745 }
746 /*
747 * 2) Or no tasks available, time to park and, potentially, shut down the thread.
748 * Add itself to the stack of parked workers, re-scans all the queues
749 * to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks.
750 */
751 tryPark()
752 }
753 tryReleaseCpu(WorkerState.TERMINATED)
754 }
755
756 /**
757 * See [runSingleTaskFromCurrentSystemDispatcher] for rationale and details.
758 * This is a fine-tailored method for a specific use-case not expected to be used widely.
759 */
760 fun runSingleTask(): Long {
761 val stateSnapshot = state
762 val isCpuThread = state == WorkerState.CPU_ACQUIRED
763 val task = if (isCpuThread) {
764 findCpuTask()
765 } else {
766 findBlockingTask()
767 }
768 if (task == null) {
769 if (minDelayUntilStealableTaskNs == 0L) return -1L
770 return minDelayUntilStealableTaskNs
771 }
772 runSafely(task)
773 if (!isCpuThread) decrementBlockingTasks()
774 assert { state == stateSnapshot }
775 return 0L
776 }
777
778 fun isIo() = state == WorkerState.BLOCKING
779
780 // Counterpart to "tryUnpark"
781 private fun tryPark() {
782 if (!inStack()) {
783 parkedWorkersStackPush(this)
784 return
785 }
786 workerCtl.value = PARKED // Update value once
787 /*
788 * inStack() prevents spurious wakeups, while workerCtl.value == PARKED
789 * prevents the following race:
790 *
791 * - T2 scans the queue, adds itself to the stack, goes to rescan
792 * - T2 suspends in 'workerCtl.value = PARKED' line
793 * - T1 pops T2 from the stack, claims workerCtl, suspends
794 * - T2 fails 'while (inStack())' check, goes to full rescan
795 * - T2 adds itself to the stack, parks
796 * - T1 unparks T2, bails out with success
797 * - T2 unparks and loops in 'while (inStack())'
798 */
799 while (inStack() && workerCtl.value == PARKED) { // Prevent spurious wakeups
800 if (isTerminated || state == WorkerState.TERMINATED) break
801 tryReleaseCpu(WorkerState.PARKING)
802 interrupted() // Cleanup interruptions
803 park()
804 }
805 }
806
807 private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK
808
809 private fun executeTask(task: Task) {
810 terminationDeadline = 0L // reset deadline for termination
811 if (state == WorkerState.PARKING) {
812 assert { task.isBlocking }
813 state = WorkerState.BLOCKING
814 }
815 if (task.isBlocking) {
816 // Always notify about new work when releasing CPU-permit to execute some blocking task
817 if (tryReleaseCpu(WorkerState.BLOCKING)) {
818 signalCpuWork()
819 }
820 runSafely(task)
821 decrementBlockingTasks()
822 val currentState = state
823 // Shutdown sequence of blocking dispatcher
824 if (currentState !== WorkerState.TERMINATED) {
825 assert { currentState == WorkerState.BLOCKING } // "Expected BLOCKING state, but has $currentState"
826 state = WorkerState.DORMANT
827 }
828 } else {
829 runSafely(task)
830 }
831 }
832
833 /*
834 * Marsaglia xorshift RNG with period 2^32-1 for work stealing purposes.
835 * ThreadLocalRandom cannot be used to support Android and ThreadLocal<Random> is up to 15% slower on Ktor benchmarks
836 */
837 fun nextInt(upperBound: Int): Int {
838 var r = rngState
839 r = r xor (r shl 13)
840 r = r xor (r shr 17)
841 r = r xor (r shl 5)
842 rngState = r
843 val mask = upperBound - 1
844 // Fast path for power of two bound
845 if (mask and upperBound == 0) {
846 return r and mask
847 }
848 return (r and Int.MAX_VALUE) % upperBound
849 }
850
851 private fun park() {
852 // set termination deadline the first time we are here (it is reset in idleReset)
853 if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs
854 // actually park
855 LockSupport.parkNanos(idleWorkerKeepAliveNs)
856 // try terminate when we are idle past termination deadline
857 // note that comparison is written like this to protect against potential nanoTime wraparound
858 if (System.nanoTime() - terminationDeadline >= 0) {
859 terminationDeadline = 0L // if attempt to terminate worker fails we'd extend deadline again
860 tryTerminateWorker()
861 }
862 }
863
864 /**
865 * Stops execution of current thread and removes it from [createdWorkers].
866 */
867 private fun tryTerminateWorker() {
868 synchronized(workers) {
869 // Make sure we're not trying race with termination of scheduler
870 if (isTerminated) return
871 // Someone else terminated, bail out
872 if (createdWorkers <= corePoolSize) return
873 /*
874 * See tryUnpark for state reasoning.
875 * If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
876 */
877 if (!workerCtl.compareAndSet(PARKED, TERMINATED)) return
878 /*
879 * At this point this thread is no longer considered as usable for scheduling.
880 * We need multi-step choreography to reindex workers.
881 *
882 * 1) Read current worker's index and reset it to zero.
883 */
884 val oldIndex = indexInArray
885 indexInArray = 0
886 /*
887 * Now this worker cannot become the top of parkedWorkersStack, but it can
888 * still be at the stack top via oldIndex.
889 *
890 * 2) Update top of stack if it was pointing to oldIndex and make sure no
891 * pending push/pop operation that might have already retrieved oldIndex could complete.
892 */
893 parkedWorkersStackTopUpdate(this, oldIndex, 0)
894 /*
895 * 3) Move last worker into an index in array that was previously occupied by this worker,
896 * if last worker was a different one (sic!).
897 */
898 val lastIndex = decrementCreatedWorkers()
899 if (lastIndex != oldIndex) {
900 val lastWorker = workers[lastIndex]!!
901 workers.setSynchronized(oldIndex, lastWorker)
902 lastWorker.indexInArray = oldIndex
903 /*
904 * Now lastWorker is available at both indices in the array, but it can
905 * still be at the stack top on via its lastIndex
906 *
907 * 4) Update top of stack lastIndex -> oldIndex and make sure no
908 * pending push/pop operation that might have already retrieved lastIndex could complete.
909 */
910 parkedWorkersStackTopUpdate(lastWorker, lastIndex, oldIndex)
911 }
912 /*
913 * 5) It is safe to clear reference from workers array now.
914 */
915 workers.setSynchronized(lastIndex, null)
916 }
917 state = WorkerState.TERMINATED
918 }
919
920 fun findTask(mayHaveLocalTasks: Boolean): Task? {
921 if (tryAcquireCpuPermit()) return findAnyTask(mayHaveLocalTasks)
922 /*
923 * If we can't acquire a CPU permit, attempt to find blocking task:
924 * - Check if our queue has one (maybe mixed in with CPU tasks)
925 * - Poll global and try steal
926 */
927 return findBlockingTask()
928 }
929
930 // NB: ONLY for runSingleTask method
931 private fun findBlockingTask(): Task? {
932 return localQueue.pollBlocking()
933 ?: globalBlockingQueue.removeFirstOrNull()
934 ?: trySteal(STEAL_BLOCKING_ONLY)
935 }
936
937 // NB: ONLY for runSingleTask method
938 private fun findCpuTask(): Task? {
939 return localQueue.pollCpu()
940 ?: globalBlockingQueue.removeFirstOrNull()
941 ?: trySteal(STEAL_CPU_ONLY)
942 }
943
944 private fun findAnyTask(scanLocalQueue: Boolean): Task? {
945 /*
946 * Anti-starvation mechanism: probabilistically poll either local
947 * or global queue to ensure progress for both external and internal tasks.
948 */
949 if (scanLocalQueue) {
950 val globalFirst = nextInt(2 * corePoolSize) == 0
951 if (globalFirst) pollGlobalQueues()?.let { return it }
952 localQueue.poll()?.let { return it }
953 if (!globalFirst) pollGlobalQueues()?.let { return it }
954 } else {
955 pollGlobalQueues()?.let { return it }
956 }
957 return trySteal(STEAL_ANY)
958 }
959
960 private fun pollGlobalQueues(): Task? {
961 if (nextInt(2) == 0) {
962 globalCpuQueue.removeFirstOrNull()?.let { return it }
963 return globalBlockingQueue.removeFirstOrNull()
964 } else {
965 globalBlockingQueue.removeFirstOrNull()?.let { return it }
966 return globalCpuQueue.removeFirstOrNull()
967 }
968 }
969
970 private fun trySteal(stealingMode: StealingMode): Task? {
971 val created = createdWorkers
972 // 0 to await an initialization and 1 to avoid excess stealing on single-core machines
973 if (created < 2) {
974 return null
975 }
976
977 var currentIndex = nextInt(created)
978 var minDelay = Long.MAX_VALUE
979 repeat(created) {
980 ++currentIndex
981 if (currentIndex > created) currentIndex = 1
982 val worker = workers[currentIndex]
983 if (worker !== null && worker !== this) {
984 val stealResult = worker.localQueue.trySteal(stealingMode, stolenTask)
985 if (stealResult == TASK_STOLEN) {
986 val result = stolenTask.element
987 stolenTask.element = null
988 return result
989 } else if (stealResult > 0) {
990 minDelay = min(minDelay, stealResult)
991 }
992 }
993 }
994 minDelayUntilStealableTaskNs = if (minDelay != Long.MAX_VALUE) minDelay else 0
995 return null
996 }
997 }
998
999 enum class WorkerState {
1000 /**
1001 * Has CPU token and either executes a [Task.isBlocking]` == false` task or tries to find one.
1002 */
1003 CPU_ACQUIRED,
1004
1005 /**
1006 * Executing task with [Task.isBlocking].
1007 */
1008 BLOCKING,
1009
1010 /**
1011 * Currently parked.
1012 */
1013 PARKING,
1014
1015 /**
1016 * Tries to execute its local work and then goes to infinite sleep as no longer needed worker.
1017 */
1018 DORMANT,
1019
1020 /**
1021 * Terminal state, will no longer be used
1022 */
1023 TERMINATED
1024 }
1025 }
1026
1027 /**
1028 * Checks if the thread is part of a thread pool that supports coroutines.
1029 * This function is needed for integration with BlockHound.
1030 */
1031 @JvmName("isSchedulerWorker")
isSchedulerWorkernull1032 internal fun isSchedulerWorker(thread: Thread) = thread is CoroutineScheduler.Worker
1033
1034 /**
1035 * Checks if the thread is running a CPU-bound task.
1036 * This function is needed for integration with BlockHound.
1037 */
1038 @JvmName("mayNotBlock")
1039 internal fun mayNotBlock(thread: Thread) = thread is CoroutineScheduler.Worker &&
1040 thread.state == CoroutineScheduler.WorkerState.CPU_ACQUIRED
1041