• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines.internal
6 
7 import kotlinx.atomicfu.*
8 import kotlinx.coroutines.*
9 import kotlin.coroutines.*
10 
11 /**
12  * The result of .limitedParallelism(x) call, a dispatcher
13  * that wraps the given dispatcher, but limits the parallelism level, while
14  * trying to emulate fairness.
15  *
16  * ### Implementation details
17  *
18  * By design, 'LimitedDispatcher' never [dispatches][CoroutineDispatcher.dispatch] originally sent tasks
19  * to the underlying dispatcher. Instead, it maintains its own queue of tasks sent to this dispatcher and
20  * dispatches at most [parallelism] "worker-loop" tasks that poll the underlying queue and cooperatively preempt
21  * in order to avoid starvation of the underlying dispatcher.
22  *
23  * Such behavior is crucial to be compatible with any underlying dispatcher implementation without
24  * direct cooperation.
25  */
26 internal class LimitedDispatcher(
27     private val dispatcher: CoroutineDispatcher,
28     private val parallelism: Int
29 ) : CoroutineDispatcher(), Delay by (dispatcher as? Delay ?: DefaultDelay) {
30 
31     // Atomic is necessary here for the sake of K/N memory ordering,
32     // there is no need in atomic operations for this property
33     private val runningWorkers = atomic(0)
34 
35     private val queue = LockFreeTaskQueue<Runnable>(singleConsumer = false)
36 
37     // A separate object that we can synchronize on for K/N
38     private val workerAllocationLock = SynchronizedObject()
39 
40     @ExperimentalCoroutinesApi
41     override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
42         parallelism.checkParallelism()
43         if (parallelism >= this.parallelism) return this
44         return super.limitedParallelism(parallelism)
45     }
46 
47     override fun dispatch(context: CoroutineContext, block: Runnable) {
48         dispatchInternal(block) { worker ->
49             dispatcher.dispatch(this, worker)
50         }
51     }
52 
53     @InternalCoroutinesApi
54     override fun dispatchYield(context: CoroutineContext, block: Runnable) {
55         dispatchInternal(block) { worker ->
56             dispatcher.dispatchYield(this, worker)
57         }
58     }
59 
60     /**
61      * Tries to dispatch the given [block].
62      * If there are not enough workers, it starts a new one via [startWorker].
63      */
64     private inline fun dispatchInternal(block: Runnable, startWorker: (Worker) -> Unit) {
65         // Add task to queue so running workers will be able to see that
66         queue.addLast(block)
67         if (runningWorkers.value >= parallelism) return
68         // allocation may fail if some workers were launched in parallel or a worker temporarily decreased
69         // `runningWorkers` when they observed an empty queue.
70         if (!tryAllocateWorker()) return
71         val task = obtainTaskOrDeallocateWorker() ?: return
72         startWorker(Worker(task))
73     }
74 
75     /**
76      * Tries to obtain the permit to start a new worker.
77      */
78     private fun tryAllocateWorker(): Boolean {
79         synchronized(workerAllocationLock) {
80             if (runningWorkers.value >= parallelism) return false
81             runningWorkers.incrementAndGet()
82             return true
83         }
84     }
85 
86     /**
87      * Obtains the next task from the queue, or logically deallocates the worker if the queue is empty.
88      */
89     private fun obtainTaskOrDeallocateWorker(): Runnable? {
90         while (true) {
91             when (val nextTask = queue.removeFirstOrNull()) {
92                 null -> synchronized(workerAllocationLock) {
93                     runningWorkers.decrementAndGet()
94                     if (queue.size == 0) return null
95                     runningWorkers.incrementAndGet()
96                 }
97                 else -> return nextTask
98             }
99         }
100     }
101 
102     /**
103      * A worker that polls the queue and runs tasks until there are no more of them.
104      *
105      * It always stores the next task to run. This is done in order to prevent the possibility of the fairness
106      * re-dispatch happening when there are no more tasks in the queue. This is important because, after all the
107      * actual tasks are done, nothing prevents the user from closing the dispatcher and making it incorrect to
108      * perform any more dispatches.
109      */
110     private inner class Worker(private var currentTask: Runnable) : Runnable {
111         override fun run() {
112             var fairnessCounter = 0
113             while (true) {
114                 try {
115                     currentTask.run()
116                 } catch (e: Throwable) {
117                     handleCoroutineException(EmptyCoroutineContext, e)
118                 }
119                 currentTask = obtainTaskOrDeallocateWorker() ?: return
120                 // 16 is our out-of-thin-air constant to emulate fairness. Used in JS dispatchers as well
121                 if (++fairnessCounter >= 16 && dispatcher.isDispatchNeeded(this@LimitedDispatcher)) {
122                     // Do "yield" to let other views execute their runnable as well
123                     // Note that we do not decrement 'runningWorkers' as we are still committed to our part of work
124                     dispatcher.dispatch(this@LimitedDispatcher, this)
125                     return
126                 }
127             }
128         }
129     }
130 }
131 
132 // Save a few bytecode ops
<lambda>null133 internal fun Int.checkParallelism() = require(this >= 1) { "Expected positive parallelism level, but got $this" }
134