1 /*
<lambda>null2 * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3 */
4
5 package kotlinx.coroutines
6
7 import kotlinx.coroutines.internal.*
8 import org.w3c.dom.*
9 import kotlin.coroutines.*
10 import kotlin.js.Promise
11
12 private const val MAX_DELAY = Int.MAX_VALUE.toLong()
13
14 private fun delayToInt(timeMillis: Long): Int =
15 timeMillis.coerceIn(0, MAX_DELAY).toInt()
16
17 internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
18 inner class ScheduledMessageQueue : MessageQueue() {
19 internal val processQueue: dynamic = { process() }
20
21 override fun schedule() {
22 scheduleQueueProcessing()
23 }
24
25 override fun reschedule() {
26 setTimeout(processQueue, 0)
27 }
28 }
29
30 internal val messageQueue = ScheduledMessageQueue()
31
32 abstract fun scheduleQueueProcessing()
33
34 override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
35 parallelism.checkParallelism()
36 return this
37 }
38
39 override fun dispatch(context: CoroutineContext, block: Runnable) {
40 messageQueue.enqueue(block)
41 }
42
43 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
44 val handle = setTimeout({ block.run() }, delayToInt(timeMillis))
45 return ClearTimeout(handle)
46 }
47
48 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
49 val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
50 // Actually on cancellation, but clearTimeout is idempotent
51 continuation.invokeOnCancellation(handler = ClearTimeout(handle).asHandler)
52 }
53 }
54
55 internal object NodeDispatcher : SetTimeoutBasedDispatcher() {
scheduleQueueProcessingnull56 override fun scheduleQueueProcessing() {
57 process.nextTick(messageQueue.processQueue)
58 }
59 }
60
61 internal object SetTimeoutDispatcher : SetTimeoutBasedDispatcher() {
scheduleQueueProcessingnull62 override fun scheduleQueueProcessing() {
63 setTimeout(messageQueue.processQueue, 0)
64 }
65 }
66
67 private class ClearTimeout(private val handle: Int) : CancelHandler(), DisposableHandle {
68
disposenull69 override fun dispose() {
70 clearTimeout(handle)
71 }
72
invokenull73 override fun invoke(cause: Throwable?) {
74 dispose()
75 }
76
toStringnull77 override fun toString(): String = "ClearTimeout[$handle]"
78 }
79
80 internal class WindowDispatcher(private val window: Window) : CoroutineDispatcher(), Delay {
81 private val queue = WindowMessageQueue(window)
82
83 override fun dispatch(context: CoroutineContext, block: Runnable) = queue.enqueue(block)
84
85 override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
86 window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
87 }
88
89 override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
90 val handle = window.setTimeout({ block.run() }, delayToInt(timeMillis))
91 return object : DisposableHandle {
92 override fun dispose() {
93 window.clearTimeout(handle)
94 }
95 }
96 }
97 }
98
99 private class WindowMessageQueue(private val window: Window) : MessageQueue() {
100 private val messageName = "dispatchCoroutine"
101
102 init {
eventnull103 window.addEventListener("message", { event: dynamic ->
104 if (event.source == window && event.data == messageName) {
105 event.stopPropagation()
106 process()
107 }
108 }, true)
109 }
110
schedulenull111 override fun schedule() {
112 Promise.resolve(Unit).then({ process() })
113 }
114
reschedulenull115 override fun reschedule() {
116 window.postMessage(messageName, "*")
117 }
118 }
119
120 /**
121 * An abstraction over JS scheduling mechanism that leverages micro-batching of dispatched blocks without
122 * paying the cost of JS callbacks scheduling on every dispatch.
123 *
124 * Queue uses two scheduling mechanisms:
125 * 1) [schedule] is used to schedule the initial processing of the message queue.
126 * JS engine-specific microtask mechanism is used in order to boost performance on short runs and a dispatch batch
127 * 2) [reschedule] is used to schedule processing of the queue after yield to the JS event loop.
128 * JS engine-specific macrotask mechanism is used not to starve animations and non-coroutines macrotasks.
129 *
130 * Yet there could be a long tail of "slow" reschedules, but it should be amortized by the queue size.
131 */
132 internal abstract class MessageQueue : ArrayQueue<Runnable>() {
133 val yieldEvery = 16 // yield to JS macrotask event loop after this many processed messages
134 private var scheduled = false
135
schedulenull136 abstract fun schedule()
137
138 abstract fun reschedule()
139
140 fun enqueue(element: Runnable) {
141 addLast(element)
142 if (!scheduled) {
143 scheduled = true
144 schedule()
145 }
146 }
147
processnull148 fun process() {
149 try {
150 // limit number of processed messages
151 repeat(yieldEvery) {
152 val element = removeFirstOrNull() ?: return@process
153 element.run()
154 }
155 } finally {
156 if (isEmpty) {
157 scheduled = false
158 } else {
159 reschedule()
160 }
161 }
162 }
163 }
164
165 // We need to reference global setTimeout and clearTimeout so that it works on Node.JS as opposed to
166 // using them via "window" (which only works in browser)
setTimeoutnull167 private external fun setTimeout(handler: dynamic, timeout: Int = definedExternally): Int
168 private external fun clearTimeout(handle: Int = definedExternally)
169