<lambda>null1 package kotlinx.coroutines.internal
2
3 import kotlinx.atomicfu.*
4 import kotlinx.coroutines.*
5 import kotlin.coroutines.*
6
7 /**
8 * The result of .limitedParallelism(x) call, a dispatcher
9 * that wraps the given dispatcher, but limits the parallelism level, while
10 * trying to emulate fairness.
11 *
12 * ### Implementation details
13 *
14 * By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks
15 * to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and
16 * dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt
17 * in order to avoid starvation of the underlying dispatcher.
18 *
19 * Such behavior is crucial to be compatible with any underlying dispatcher implementation without
20 * direct cooperation.
21 */
22 internal class LimitedDispatcher(
23 private val dispatcher: CoroutineDispatcher,
24 private val parallelism: Int,
25 private val name: String?
26 ) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {
27
28 // Atomic is necessary here for the sake of K/N memory ordering,
29 // there is no need in atomic operations for this property
30 private val runningWorkers = atomic(0)
31
32 private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
33
34 // A separate object that we can synchronize on for K/N
35 private val workerAllocationLock = SynchronizedObject()
36
37 override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
38 parallelism.checkParallelism()
39 if (parallelism >= this.parallelism) return namedOrThis(name)
40 return super.limitedParallelism(parallelism, name)
41 }
42
43 override fun dispatch(context: CoroutineContext, block: Runnable) {
44 dispatchInternal(block) { worker ->
45 dispatcher.safeDispatch(this, worker)
46 }
47 }
48
49 @InternalCoroutinesApi
50 override fun dispatchYield(context: CoroutineContext, block: Runnable) {
51 dispatchInternal(block) { worker ->
52 dispatcher.dispatchYield(this, worker)
53 }
54 }
55
56 /**
57 * Tries to dispatch the given [block].
58 * If there are not enough workers, it starts a new one via [startWorker].
59 */
60 private inline fun dispatchInternal(block: Runnable, startWorker: (Worker) -> Unit) {
61 // Add task to queue so running workers will be able to see that
62 queue.addLast(block)
63 if (runningWorkers.value >= parallelism) return
64 // allocation may fail if some workers were launched in parallel or a worker temporarily decreased
65 // `runningWorkers` when they observed an empty queue.
66 if (!tryAllocateWorker()) return
67 val task = obtainTaskOrDeallocateWorker() ?: return
68 startWorker(Worker(task))
69 }
70
71 /**
72 * Tries to obtain the permit to start a new worker.
73 */
74 private fun tryAllocateWorker(): Boolean {
75 synchronized(workerAllocationLock) {
76 if (runningWorkers.value >= parallelism) return false
77 runningWorkers.incrementAndGet()
78 return true
79 }
80 }
81
82 /**
83 * Obtains the next task from the queue, or logically deallocates the worker if the queue is empty.
84 */
85 private fun obtainTaskOrDeallocateWorker(): Runnable? {
86 while (true) {
87 when (val nextTask = queue.removeFirstOrNull()) {
88 null -> synchronized(workerAllocationLock) {
89 runningWorkers.decrementAndGet()
90 if (queue.size == 0) return null
91 runningWorkers.incrementAndGet()
92 }
93 else -> return nextTask
94 }
95 }
96 }
97
98 override fun toString() = name ?: "$dispatcher.limitedParallelism($parallelism)"
99
100 /**
101 * A worker that polls the queue and runs tasks until there are no more of them.
102 *
103 * It always stores the next task to run. This is done in order to prevent the possibility of the fairness
104 * re-dispatch happening when there are no more tasks in the queue. This is important because, after all the
105 * actual tasks are done, nothing prevents the user from closing the dispatcher and making it incorrect to
106 * perform any more dispatches.
107 */
108 private inner class Worker(private var currentTask: Runnable) : Runnable {
109 override fun run() {
110 var fairnessCounter = 0
111 while (true) {
112 try {
113 currentTask.run()
114 } catch (e: Throwable) {
115 handleCoroutineException(EmptyCoroutineContext, e)
116 }
117 currentTask = obtainTaskOrDeallocateWorker() ?: return
118 // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
119 if (++fairnessCounter >= 16 && dispatcher.safeIsDispatchNeeded(this@LimitedDispatcher)) {
120 // Do "yield" to let other views execute their runnable as well
121 // Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
122 dispatcher.safeDispatch(this@LimitedDispatcher, this)
123 return
124 }
125 }
126 }
127 }
128 }
129
<lambda>null130 internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }
131
namedOrThisnull132 internal fun CoroutineDispatcher.namedOrThis(name: String?): CoroutineDispatcher {
133 if (name != null) return NamedDispatcher(this, name)
134 return this
135 }