• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.internal.*
8 import org.w3c.dom.*
9 import kotlin.coroutines.*
10 import kotlin.js.Promise
11 
12 private const val MAX_DELAY = Int.MAX_VALUE.toLong()
13 
14 private fun delayToInt(timeMillis: Long): Int =
15     timeMillis.coerceIn(0, MAX_DELAY).toInt()
16 
17 internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
18     inner class ScheduledMessageQueue : MessageQueue() {
19         internal val processQueue: dynamic = { process() }
20 
21         override fun schedule() {
22             scheduleQueueProcessing()
23         }
24 
25         override fun reschedule() {
26             setTimeout(processQueue, 0)
27         }
28     }
29 
30     internal val messageQueue = ScheduledMessageQueue()
31 
32     abstract fun scheduleQueueProcessing()
33 
34     override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
35         parallelism.checkParallelism()
36         return this
37     }
38 
39     override fun dispatch(context: CoroutineContext, block: Runnable) {
40         messageQueue.enqueue(block)
41     }
42 
43     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
44         val handle = setTimeout({ block.run() }, delayToInt(timeMillis))
45         return ClearTimeout(handle)
46     }
47 
48     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
49         val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
50         continuation.invokeOnCancellation(handler = ClearTimeout(handle).asHandler)
51     }
52 }
53 
54 internal object NodeDispatcher : SetTimeoutBasedDispatcher() {
scheduleQueueProcessingnull55     override fun scheduleQueueProcessing() {
56         process.nextTick(messageQueue.processQueue)
57     }
58 }
59 
60 internal object SetTimeoutDispatcher : SetTimeoutBasedDispatcher() {
scheduleQueueProcessingnull61     override fun scheduleQueueProcessing() {
62         setTimeout(messageQueue.processQueue, 0)
63     }
64 }
65 
66 private open class ClearTimeout(protected val handle: Int) : CancelHandler(), DisposableHandle {
67 
disposenull68     override fun dispose() {
69         clearTimeout(handle)
70     }
71 
invokenull72     override fun invoke(cause: Throwable?) {
73         dispose()
74     }
75 
toStringnull76     override fun toString(): String = "ClearTimeout[$handle]"
77 }
78 
79 internal class WindowDispatcher(private val window: Window) : CoroutineDispatcher(), Delay {
80     private val queue = WindowMessageQueue(window)
81 
82     override fun dispatch(context: CoroutineContext, block: Runnable) = queue.enqueue(block)
83 
84     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
85         val handle = window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
86         continuation.invokeOnCancellation(handler = WindowClearTimeout(handle).asHandler)
87     }
88 
89     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
90         val handle = window.setTimeout({ block.run() }, delayToInt(timeMillis))
91         return WindowClearTimeout(handle)
92     }
93 
94     private inner class WindowClearTimeout(handle: Int) : ClearTimeout(handle) {
95         override fun dispose() {
96             window.clearTimeout(handle)
97         }
98     }
99 }
100 
101 private class WindowMessageQueue(private val window: Window) : MessageQueue() {
102     private val messageName = "dispatchCoroutine"
103 
104     init {
eventnull105         window.addEventListener("message", { event: dynamic ->
106             if (event.source == window && event.data == messageName) {
107                 event.stopPropagation()
108                 process()
109             }
110         }, true)
111     }
112 
schedulenull113     override fun schedule() {
114         Promise.resolve(Unit).then({ process() })
115     }
116 
reschedulenull117     override fun reschedule() {
118         window.postMessage(messageName, "*")
119     }
120 }
121 
122 /**
123  * An abstraction over JS scheduling mechanism that leverages micro-batching of dispatched blocks without
124  * paying the cost of JS callbacks scheduling on every dispatch.
125  *
126  * Queue uses two scheduling mechanisms:
127  * 1) [schedule] is used to schedule the initial processing of the message queue.
128  *    JS engine-specific microtask mechanism is used in order to boost performance on short runs and a dispatch batch
129  * 2) [reschedule] is used to schedule processing of the queue after yield to the JS event loop.
130  *    JS engine-specific macrotask mechanism is used not to starve animations and non-coroutines macrotasks.
131  *
132  * Yet there could be a long tail of "slow" reschedules, but it should be amortized by the queue size.
133  */
<lambda>null134 internal abstract class MessageQueue : MutableList<Runnable> by ArrayDeque() {
135     val yieldEvery = 16 // yield to JS macrotask event loop after this many processed messages
136     private var scheduled = false
137 
138     abstract fun schedule()
139 
140     abstract fun reschedule()
141 
142     fun enqueue(element: Runnable) {
143         add(element)
144         if (!scheduled) {
145             scheduled = true
146             schedule()
147         }
148     }
149 
150     fun process() {
151         try {
152             // limit number of processed messages
153             repeat(yieldEvery) {
154                 val element = removeFirstOrNull() ?: return@process
155                 element.run()
156             }
157         } finally {
158             if (isEmpty()) {
159                 scheduled = false
160             } else {
161                 reschedule()
162             }
163         }
164     }
165 }
166 
167 // We need to reference global setTimeout and clearTimeout so that it works on Node.JS as opposed to
168 // using them via "window" (which only works in browser)
setTimeoutnull169 private external fun setTimeout(handler: dynamic, timeout: Int = definedExternally): Int
170 private external fun clearTimeout(handle: Int = definedExternally)
171