1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.coroutines.internal.*
8 import org.w3c.dom.*
9 import kotlin.coroutines.*
10 import kotlin.js.Promise
11
12 private const val MAX_DELAY = Int.MAX_VALUE.toLong()
13
14 private fun delayToInt(timeMillis: Long): Int =
15 timeMillis.coerceIn(0, MAX_DELAY).toInt()
16
17 internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
18 inner class ScheduledMessageQueue : MessageQueue() {
19 internal val processQueue: dynamic = { process() }
20
21 override fun schedule() {
22 scheduleQueueProcessing()
23 }
24
25 override fun reschedule() {
26 setTimeout(processQueue, 0)
27 }
28 }
29
30 internal val messageQueue = ScheduledMessageQueue()
31
32 abstract fun scheduleQueueProcessing()
33
34 override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
35 parallelism.checkParallelism()
36 return this
37 }
38
39 override fun dispatch(context: CoroutineContext, block: Runnable) {
40 messageQueue.enqueue(block)
41 }
42
43 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
44 val handle = setTimeout({ block.run() }, delayToInt(timeMillis))
45 return ClearTimeout(handle)
46 }
47
48 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
49 val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
50 continuation.invokeOnCancellation(handler = ClearTimeout(handle).asHandler)
51 }
52 }
53
54 internal object NodeDispatcher : SetTimeoutBasedDispatcher() {
scheduleQueueProcessingnull55 override fun scheduleQueueProcessing() {
56 process.nextTick(messageQueue.processQueue)
57 }
58 }
59
60 internal object SetTimeoutDispatcher : SetTimeoutBasedDispatcher() {
scheduleQueueProcessingnull61 override fun scheduleQueueProcessing() {
62 setTimeout(messageQueue.processQueue, 0)
63 }
64 }
65
66 private open class ClearTimeout(protected val handle: Int) : CancelHandler(), DisposableHandle {
67
disposenull68 override fun dispose() {
69 clearTimeout(handle)
70 }
71
invokenull72 override fun invoke(cause: Throwable?) {
73 dispose()
74 }
75
toStringnull76 override fun toString(): String = "ClearTimeout[$handle]"
77 }
78
79 internal class WindowDispatcher(private val window: Window) : CoroutineDispatcher(), Delay {
80 private val queue = WindowMessageQueue(window)
81
82 override fun dispatch(context: CoroutineContext, block: Runnable) = queue.enqueue(block)
83
84 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
85 val handle = window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
86 continuation.invokeOnCancellation(handler = WindowClearTimeout(handle).asHandler)
87 }
88
89 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
90 val handle = window.setTimeout({ block.run() }, delayToInt(timeMillis))
91 return WindowClearTimeout(handle)
92 }
93
94 private inner class WindowClearTimeout(handle: Int) : ClearTimeout(handle) {
95 override fun dispose() {
96 window.clearTimeout(handle)
97 }
98 }
99 }
100
101 private class WindowMessageQueue(private val window: Window) : MessageQueue() {
102 private val messageName = "dispatchCoroutine"
103
104 init {
eventnull105 window.addEventListener("message", { event: dynamic ->
106 if (event.source == window && event.data == messageName) {
107 event.stopPropagation()
108 process()
109 }
110 }, true)
111 }
112
schedulenull113 override fun schedule() {
114 Promise.resolve(Unit).then({ process() })
115 }
116
reschedulenull117 override fun reschedule() {
118 window.postMessage(messageName, "*")
119 }
120 }
121
122 /**
123 * An abstraction over JS scheduling mechanism that leverages micro-batching of dispatched blocks without
124 * paying the cost of JS callbacks scheduling on every dispatch.
125 *
126 * Queue uses two scheduling mechanisms:
127 * 1) [schedule] is used to schedule the initial processing of the message queue.
128 * JS engine-specific microtask mechanism is used in order to boost performance on short runs and a dispatch batch
129 * 2) [reschedule] is used to schedule processing of the queue after yield to the JS event loop.
130 * JS engine-specific macrotask mechanism is used not to starve animations and non-coroutines macrotasks.
131 *
132 * Yet there could be a long tail of "slow" reschedules, but it should be amortized by the queue size.
133 */
<lambda>null134 internal abstract class MessageQueue : MutableList<Runnable> by ArrayDeque() {
135 val yieldEvery = 16 // yield to JS macrotask event loop after this many processed messages
136 private var scheduled = false
137
138 abstract fun schedule()
139
140 abstract fun reschedule()
141
142 fun enqueue(element: Runnable) {
143 add(element)
144 if (!scheduled) {
145 scheduled = true
146 schedule()
147 }
148 }
149
150 fun process() {
151 try {
152 // limit number of processed messages
153 repeat(yieldEvery) {
154 val element = removeFirstOrNull() ?: return@process
155 element.run()
156 }
157 } finally {
158 if (isEmpty()) {
159 scheduled = false
160 } else {
161 reschedule()
162 }
163 }
164 }
165 }
166
167 // We need to reference global setTimeout and clearTimeout so that it works on Node.JS as opposed to
168 // using them via "window" (which only works in browser)
setTimeoutnull169 private external fun setTimeout(handler: dynamic, timeout: Int = definedExternally): Int
170 private external fun clearTimeout(handle: Int = definedExternally)
171