1 @file:OptIn(ObsoleteWorkersApi::class)
2
3 package kotlinx.coroutines
4
5 import kotlinx.atomicfu.*
6 import kotlinx.coroutines.channels.*
7 import kotlinx.coroutines.internal.*
8 import kotlin.coroutines.*
9 import kotlin.concurrent.AtomicReference
10 import kotlin.native.concurrent.*
11 import kotlin.time.*
12 import kotlin.time.Duration.Companion.milliseconds
13
14 @DelicateCoroutinesApi
newFixedThreadPoolContextnull15 public actual fun newFixedThreadPoolContext(nThreads: Int, name: String): CloseableCoroutineDispatcher {
16 require(nThreads >= 1) { "Expected at least one thread, but got: $nThreads" }
17 return MultiWorkerDispatcher(name, nThreads)
18 }
19
20 internal class WorkerDispatcher(name: String) : CloseableCoroutineDispatcher(), Delay {
21 private val worker = Worker.start(name = name)
22
dispatchnull23 override fun dispatch(context: CoroutineContext, block: Runnable) {
24 worker.executeAfter(0L) { block.run() }
25 }
26
scheduleResumeAfterDelaynull27 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
28 val handle = schedule(timeMillis, Runnable {
29 with(continuation) { resumeUndispatched(Unit) }
30 })
31 continuation.disposeOnCancellation(handle)
32 }
33
invokeOnTimeoutnull34 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
35 schedule(timeMillis, block)
36
37 private fun schedule(timeMillis: Long, block: Runnable): DisposableHandle {
38 // Workers don't have an API to cancel sent "executeAfter" block, but we are trying
39 // to control the damage and reduce reachable objects by nulling out `block`
40 // that may retain a lot of references, and leaving only an empty shell after a timely disposal
41 // This is a class and not an object with `block` in a closure because that would defeat the purpose.
42 class DisposableBlock(block: Runnable) : DisposableHandle, Function0<Unit> {
43 private val disposableHolder = AtomicReference<Runnable?>(block)
44
45 override fun invoke() {
46 disposableHolder.value?.run()
47 }
48
49 override fun dispose() {
50 disposableHolder.value = null
51 }
52
53 fun isDisposed() = disposableHolder.value == null
54 }
55
56 fun Worker.runAfterDelay(block: DisposableBlock, targetMoment: TimeMark) {
57 if (block.isDisposed()) return
58 val durationUntilTarget = -targetMoment.elapsedNow()
59 val quantum = 100.milliseconds
60 if (durationUntilTarget > quantum) {
61 executeAfter(quantum.inWholeMicroseconds) { runAfterDelay(block, targetMoment) }
62 } else {
63 executeAfter(maxOf(0, durationUntilTarget.inWholeMicroseconds), block)
64 }
65 }
66
67 val disposableBlock = DisposableBlock(block)
68 val targetMoment = TimeSource.Monotonic.markNow() + timeMillis.milliseconds
69 worker.runAfterDelay(disposableBlock, targetMoment)
70 return disposableBlock
71 }
72
closenull73 override fun close() {
74 worker.requestTermination().result // Note: calling "result" blocks
75 }
76 }
77
78 private class MultiWorkerDispatcher(
79 private val name: String,
80 private val workersCount: Int
81 ) : CloseableCoroutineDispatcher() {
82 private val tasksQueue = Channel<Runnable>(Channel.UNLIMITED)
83 private val availableWorkers = Channel<CancellableContinuation<Runnable>>(Channel.UNLIMITED)
<lambda>null84 private val workerPool = OnDemandAllocatingPool(workersCount) {
85 Worker.start(name = "$name-$it").apply {
86 executeAfter { workerRunLoop() }
87 }
88 }
89
90 /**
91 * (number of tasks - number of workers) * 2 + (1 if closed)
92 */
93 private val tasksAndWorkersCounter = atomic(0L)
94
95 @Suppress("NOTHING_TO_INLINE")
isClosednull96 private inline fun Long.isClosed() = this and 1L == 1L
97 @Suppress("NOTHING_TO_INLINE")
98 private inline fun Long.hasTasks() = this >= 2
99 @Suppress("NOTHING_TO_INLINE")
100 private inline fun Long.hasWorkers() = this < 0
101
102 private fun workerRunLoop() = runBlocking {
103 while (true) {
104 val state = tasksAndWorkersCounter.getAndUpdate {
105 if (it.isClosed() && !it.hasTasks()) return@runBlocking
106 it - 2
107 }
108 if (state.hasTasks()) {
109 // we promised to process a task, and there are some
110 tasksQueue.receive().run()
111 } else {
112 try {
113 suspendCancellableCoroutine {
114 val result = availableWorkers.trySend(it)
115 checkChannelResult(result)
116 }.run()
117 } catch (e: CancellationException) {
118 /** we are cancelled from [close] and thus will never get back to this branch of code,
119 but there may still be pending work, so we can't just exit here. */
120 }
121 }
122 }
123 }
124
125 // a worker that promised to be here and should actually arrive, so we wait for it in a blocking manner.
obtainWorkernull126 private fun obtainWorker(): CancellableContinuation<Runnable> =
127 availableWorkers.tryReceive().getOrNull() ?: runBlocking { availableWorkers.receive() }
128
dispatchnull129 override fun dispatch(context: CoroutineContext, block: Runnable) {
130 val state = tasksAndWorkersCounter.getAndUpdate {
131 if (it.isClosed())
132 throw IllegalStateException("Dispatcher $name was closed, attempted to schedule: $block")
133 it + 2
134 }
135 if (state.hasWorkers()) {
136 // there are workers that have nothing to do, let's grab one of them
137 obtainWorker().resume(block)
138 } else {
139 workerPool.allocate()
140 // no workers are available, we must queue the task
141 val result = tasksQueue.trySend(block)
142 checkChannelResult(result)
143 }
144 }
145
limitedParallelismnull146 override fun limitedParallelism(parallelism: Int, name: String?): CoroutineDispatcher {
147 parallelism.checkParallelism()
148 if (parallelism >= workersCount) {
149 return namedOrThis(name)
150 }
151 return super.limitedParallelism(parallelism, name)
152 }
153
closenull154 override fun close() {
155 tasksAndWorkersCounter.getAndUpdate { if (it.isClosed()) it else it or 1L }
156 val workers = workerPool.close() // no new workers will be created
157 while (true) {
158 // check if there are workers that await tasks in their personal channels, we need to wake them up
159 val state = tasksAndWorkersCounter.getAndUpdate {
160 if (it.hasWorkers()) it + 2 else it
161 }
162 if (!state.hasWorkers())
163 break
164 obtainWorker().cancel()
165 }
166 /*
167 * Here we cannot avoid waiting on `.result`, otherwise it will lead
168 * to a native memory leak, including a pthread handle.
169 */
170 val requests = workers.map { it.requestTermination() }
171 requests.map { it.result }
172 }
173
checkChannelResultnull174 private fun checkChannelResult(result: ChannelResult<*>) {
175 if (!result.isSuccess)
176 throw IllegalStateException(
177 "Internal invariants of $this were violated, please file a bug to kotlinx.coroutines",
178 result.exceptionOrNull()
179 )
180 }
181 }
182