• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 @file:OptIn(ObsoleteWorkersApi::class)
2 
3 package kotlinx.coroutines
4 
5 import kotlinx.atomicfu.*
6 import kotlinx.coroutines.channels.*
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9 import kotlin.concurrent.AtomicReference
10 import kotlin.native.concurrent.*
11 import kotlin.time.*
12 import kotlin.time.Duration.Companion.milliseconds
13 
14 @DelicateCoroutinesApi
newFixedThreadPoolContextnull15 public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher {
16     require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads" }
17     return MultiWorkerDispatcher(name, nThreads)
18 }
19 
20 internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), Delay {
21     private val worker = Worker.start(name = name)
22 
dispatchnull23     override fun dispatch(context: CoroutineContext, block: Runnable) {
24         worker.executeAfter(0L) { block.run() }
25     }
26 
scheduleResumeAfterDelaynull27     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
28         val handle = schedule(timeMillis, Runnable {
29             with(continuation) { resumeUndispatched(Unit) }
30         })
31         continuation.disposeOnCancellation(handle)
32     }
33 
invokeOnTimeoutnull34     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
35         schedule(timeMillis, block)
36 
37     private fun schedule(timeMillis: Long, block: Runnable): DisposableHandle {
38         // Workers don't have an API to cancel sent "executeAfter" block, but we are trying
39         // to control the damage and reduce reachable objects by nulling out `block`
40         // that may retain a lot of references, and leaving only an empty shell after a timely disposal
41         // This is a class and not an object with `block` in a closure because that would defeat the purpose.
42         class DisposableBlock(block: Runnable) : DisposableHandle, Function0<Unit> {
43             private val disposableHolder = AtomicReference<Runnable?>(block)
44 
45             override fun invoke() {
46                 disposableHolder.value?.run()
47             }
48 
49             override fun dispose() {
50                 disposableHolder.value = null
51             }
52 
53             fun isDisposed() = disposableHolder.value == null
54         }
55 
56         fun Worker.runAfterDelay(block: DisposableBlock, targetMoment: TimeMark) {
57             if (block.isDisposed()) return
58             val durationUntilTarget = -targetMoment.elapsedNow()
59             val quantum = 100.milliseconds
60             if (durationUntilTarget > quantum) {
61                 executeAfter(quantum.inWholeMicroseconds) { runAfterDelay(block, targetMoment) }
62             } else {
63                 executeAfter(maxOf(0, durationUntilTarget.inWholeMicroseconds), block)
64             }
65         }
66 
67         val disposableBlock = DisposableBlock(block)
68         val targetMoment = TimeSource.Monotonic.markNow() + timeMillis.milliseconds
69         worker.runAfterDelay(disposableBlock, targetMoment)
70         return disposableBlock
71     }
72 
closenull73     override fun close() {
74         worker.requestTermination().result // Note: calling "result" blocks
75     }
76 }
77 
78 private class MultiWorkerDispatcher(
79     private val name: String,
80     private val workersCount: Int
81 ) : CloseableCoroutineDispatcher() {
82     private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
83     private val availableWorkers = Channel<CancellableContinuation<Runnable>>(Channel.UNLIMITED)
<lambda>null84     private val workerPool = OnDemandAllocatingPool(workersCount) {
85         Worker.start(name = "$name-$it").apply {
86             executeAfter { workerRunLoop() }
87         }
88     }
89 
90     /**
91      * (number of tasks - number of workers) * 2 + (1 if closed)
92      */
93     private val tasksAndWorkersCounter = atomic(0L)
94 
95     @Suppress("NOTHING_TO_INLINE")
isClosednull96     private inline fun Long.isClosed() = this and 1L == 1L
97     @Suppress("NOTHING_TO_INLINE")
98     private inline fun Long.hasTasks() = this >= 2
99     @Suppress("NOTHING_TO_INLINE")
100     private inline fun Long.hasWorkers() = this < 0
101 
102     private fun workerRunLoop() = runBlocking {
103         while (true) {
104             val state = tasksAndWorkersCounter.getAndUpdate {
105                 if (it.isClosed() && !it.hasTasks()) return@runBlocking
106                 it - 2
107             }
108             if (state.hasTasks()) {
109                 // we promised to process a task, and there are some
110                 tasksQueue.receive().run()
111             } else {
112                 try {
113                     suspendCancellableCoroutine {
114                         val result = availableWorkers.trySend(it)
115                         checkChannelResult(result)
116                     }.run()
117                 } catch (e: CancellationException) {
118                     /** we are cancelled from [close] and thus will never get back to this branch of code,
119                     but there may still be pending work, so we can't just exit here. */
120                 }
121             }
122         }
123     }
124 
125     // a worker that promised to be here and should actually arrive, so we wait for it in a blocking manner.
obtainWorkernull126     private fun obtainWorker(): CancellableContinuation<Runnable> =
127         availableWorkers.tryReceive().getOrNull() ?: runBlocking { availableWorkers.receive() }
128 
dispatchnull129     override fun dispatch(context: CoroutineContext, block: Runnable) {
130         val state = tasksAndWorkersCounter.getAndUpdate {
131             if (it.isClosed())
132                 throw IllegalStateException("Dispatcher $name was closed, attempted to schedule: $block")
133             it + 2
134         }
135         if (state.hasWorkers()) {
136             // there are workers that have nothing to do, let's grab one of them
137             obtainWorker().resume(block)
138         } else {
139             workerPool.allocate()
140             // no workers are available, we must queue the task
141             val result = tasksQueue.trySend(block)
142             checkChannelResult(result)
143         }
144     }
145 
limitedParallelismnull146     override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
147         parallelism.checkParallelism()
148         if (parallelism >= workersCount) {
149             return namedOrThis(name)
150         }
151         return super.limitedParallelism(parallelism, name)
152     }
153 
closenull154     override fun close() {
155         tasksAndWorkersCounter.getAndUpdate { if (it.isClosed()) it else it or 1L }
156         val workers = workerPool.close() // no new workers will be created
157         while (true) {
158             // check if there are workers that await tasks in their personal channels, we need to wake them up
159             val state = tasksAndWorkersCounter.getAndUpdate {
160                 if (it.hasWorkers()) it + 2 else it
161             }
162             if (!state.hasWorkers())
163                 break
164             obtainWorker().cancel()
165         }
166         /*
167          * Here we cannot avoid waiting on `.result`, otherwise it will lead
168          * to a native memory leak, including a pthread handle.
169          */
170         val requests = workers.map { it.requestTermination() }
171         requests.map { it.result }
172     }
173 
checkChannelResultnull174     private fun checkChannelResult(result: ChannelResult<*>) {
175         if (!result.isSuccess)
176             throw IllegalStateException(
177                 "Internal invariants of $this were violated, please file a bug to kotlinx.coroutines",
178                 result.exceptionOrNull()
179             )
180     }
181 }
182