1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines.internal
6
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlin.coroutines.*
10
11 /**
12 * The result of .limitedParallelism(x) call, a dispatcher
13 * that wraps the given dispatcher, but limits the parallelism level, while
14 * trying to emulate fairness.
15 *
16 * ### Implementation details
17 *
18 * By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks
19 * to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and
20 * dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt
21 * in order to avoid starvation of the underlying dispatcher.
22 *
23 * Such behavior is crucial to be compatible with any underlying dispatcher implementation without
24 * direct cooperation.
25 */
26 internal class LimitedDispatcher(
27 private val dispatcher: CoroutineDispatcher,
28 private val parallelism: Int
29 ) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {
30
31 // Atomic is necessary here for the sake of K/N memory ordering,
32 // there is no need in atomic operations for this property
33 private val runningWorkers = atomic(0)
34
35 private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
36
37 // A separate object that we can synchronize on for K/N
38 private val workerAllocationLock = SynchronizedObject()
39
40 @ExperimentalCoroutinesApi
41 override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
42 parallelism.checkParallelism()
43 if (parallelism >= this.parallelism) return this
44 return super.limitedParallelism(parallelism)
45 }
46
47 override fun dispatch(context: CoroutineContext, block: Runnable) {
48 dispatchInternal(block) { worker ->
49 dispatcher.dispatch(this, worker)
50 }
51 }
52
53 @InternalCoroutinesApi
54 override fun dispatchYield(context: CoroutineContext, block: Runnable) {
55 dispatchInternal(block) { worker ->
56 dispatcher.dispatchYield(this, worker)
57 }
58 }
59
60 /**
61 * Tries to dispatch the given [block].
62 * If there are not enough workers, it starts a new one via [startWorker].
63 */
64 private inline fun dispatchInternal(block: Runnable, startWorker: (Worker) -> Unit) {
65 // Add task to queue so running workers will be able to see that
66 queue.addLast(block)
67 if (runningWorkers.value >= parallelism) return
68 // allocation may fail if some workers were launched in parallel or a worker temporarily decreased
69 // `runningWorkers` when they observed an empty queue.
70 if (!tryAllocateWorker()) return
71 val task = obtainTaskOrDeallocateWorker() ?: return
72 startWorker(Worker(task))
73 }
74
75 /**
76 * Tries to obtain the permit to start a new worker.
77 */
78 private fun tryAllocateWorker(): Boolean {
79 synchronized(workerAllocationLock) {
80 if (runningWorkers.value >= parallelism) return false
81 runningWorkers.incrementAndGet()
82 return true
83 }
84 }
85
86 /**
87 * Obtains the next task from the queue, or logically deallocates the worker if the queue is empty.
88 */
89 private fun obtainTaskOrDeallocateWorker(): Runnable? {
90 while (true) {
91 when (val nextTask = queue.removeFirstOrNull()) {
92 null -> synchronized(workerAllocationLock) {
93 runningWorkers.decrementAndGet()
94 if (queue.size == 0) return null
95 runningWorkers.incrementAndGet()
96 }
97 else -> return nextTask
98 }
99 }
100 }
101
102 /**
103 * A worker that polls the queue and runs tasks until there are no more of them.
104 *
105 * It always stores the next task to run. This is done in order to prevent the possibility of the fairness
106 * re-dispatch happening when there are no more tasks in the queue. This is important because, after all the
107 * actual tasks are done, nothing prevents the user from closing the dispatcher and making it incorrect to
108 * perform any more dispatches.
109 */
110 private inner class Worker(private var currentTask: Runnable) : Runnable {
111 override fun run() {
112 var fairnessCounter = 0
113 while (true) {
114 try {
115 currentTask.run()
116 } catch (e: Throwable) {
117 handleCoroutineException(EmptyCoroutineContext, e)
118 }
119 currentTask = obtainTaskOrDeallocateWorker() ?: return
120 // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
121 if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this@LimitedDispatcher)) {
122 // Do "yield" to let other views execute their runnable as well
123 // Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
124 dispatcher.dispatch(this@LimitedDispatcher, this)
125 return
126 }
127 }
128 }
129 }
130 }
131
132 // Save a few bytecode ops
<lambda>null133 internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }
134