• Home
  • Line#
  • Scopes#
  • Navigate#
  • Raw
  • Download
1 /*
<lambda>null2  * Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
3  */
4 
5 package kotlinx.coroutines
6 
7 import kotlinx.coroutines.internal.*
8 import org.w3c.dom.*
9 import kotlin.coroutines.*
10 import kotlin.js.Promise
11 
12 private const val MAX_DELAY = Int.MAX_VALUE.toLong()
13 
14 private fun delayToInt(timeMillis: Long): Int =
15     timeMillis.coerceIn(0, MAX_DELAY).toInt()
16 
17 internal sealed class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay {
18     inner class ScheduledMessageQueue : MessageQueue() {
19         internal val processQueue: dynamic = { process() }
20 
21         override fun schedule() {
22             scheduleQueueProcessing()
23         }
24 
25         override fun reschedule() {
26             setTimeout(processQueue, 0)
27         }
28     }
29 
30     internal val messageQueue = ScheduledMessageQueue()
31 
32     abstract fun scheduleQueueProcessing()
33 
34     override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
35         parallelism.checkParallelism()
36         return this
37     }
38 
39     override fun dispatch(context: CoroutineContext, block: Runnable) {
40         messageQueue.enqueue(block)
41     }
42 
43     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
44         val handle = setTimeout({ block.run() }, delayToInt(timeMillis))
45         return ClearTimeout(handle)
46     }
47 
48     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
49         val handle = setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
50         // Actually on cancellation, but clearTimeout is idempotent
51         continuation.invokeOnCancellation(handler = ClearTimeout(handle).asHandler)
52     }
53 }
54 
55 internal object NodeDispatcher : SetTimeoutBasedDispatcher() {
scheduleQueueProcessingnull56     override fun scheduleQueueProcessing() {
57         process.nextTick(messageQueue.processQueue)
58     }
59 }
60 
61 internal object SetTimeoutDispatcher : SetTimeoutBasedDispatcher() {
scheduleQueueProcessingnull62     override fun scheduleQueueProcessing() {
63         setTimeout(messageQueue.processQueue, 0)
64     }
65 }
66 
67 private class ClearTimeout(private val handle: Int) : CancelHandler(), DisposableHandle {
68 
disposenull69     override fun dispose() {
70         clearTimeout(handle)
71     }
72 
invokenull73     override fun invoke(cause: Throwable?) {
74         dispose()
75     }
76 
toStringnull77     override fun toString(): String = "ClearTimeout[$handle]"
78 }
79 
80 internal class WindowDispatcher(private val window: Window) : CoroutineDispatcher(), Delay {
81     private val queue = WindowMessageQueue(window)
82 
83     override fun dispatch(context: CoroutineContext, block: Runnable) = queue.enqueue(block)
84 
85     override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
86         window.setTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis))
87     }
88 
89     override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
90         val handle = window.setTimeout({ block.run() }, delayToInt(timeMillis))
91         return object : DisposableHandle {
92             override fun dispose() {
93                 window.clearTimeout(handle)
94             }
95         }
96     }
97 }
98 
99 private class WindowMessageQueue(private val window: Window) : MessageQueue() {
100     private val messageName = "dispatchCoroutine"
101 
102     init {
eventnull103         window.addEventListener("message", { event: dynamic ->
104             if (event.source == window && event.data == messageName) {
105                 event.stopPropagation()
106                 process()
107             }
108         }, true)
109     }
110 
schedulenull111     override fun schedule() {
112         Promise.resolve(Unit).then({ process() })
113     }
114 
reschedulenull115     override fun reschedule() {
116         window.postMessage(messageName, "*")
117     }
118 }
119 
120 /**
121  * An abstraction over JS scheduling mechanism that leverages micro-batching of dispatched blocks without
122  * paying the cost of JS callbacks scheduling on every dispatch.
123  *
124  * Queue uses two scheduling mechanisms:
125  * 1) [schedule] is used to schedule the initial processing of the message queue.
126  *    JS engine-specific microtask mechanism is used in order to boost performance on short runs and a dispatch batch
127  * 2) [reschedule] is used to schedule processing of the queue after yield to the JS event loop.
128  *    JS engine-specific macrotask mechanism is used not to starve animations and non-coroutines macrotasks.
129  *
130  * Yet there could be a long tail of "slow" reschedules, but it should be amortized by the queue size.
131  */
132 internal abstract class MessageQueue : ArrayQueue<Runnable>() {
133     val yieldEvery = 16 // yield to JS macrotask event loop after this many processed messages
134     private var scheduled = false
135 
schedulenull136     abstract fun schedule()
137 
138     abstract fun reschedule()
139 
140     fun enqueue(element: Runnable) {
141         addLast(element)
142         if (!scheduled) {
143             scheduled = true
144             schedule()
145         }
146     }
147 
processnull148     fun process() {
149         try {
150             // limit number of processed messages
151             repeat(yieldEvery) {
152                 val element = removeFirstOrNull() ?: return@process
153                 element.run()
154             }
155         } finally {
156             if (isEmpty) {
157                 scheduled = false
158             } else {
159                 reschedule()
160             }
161         }
162     }
163 }
164 
165 // We need to reference global setTimeout and clearTimeout so that it works on Node.JS as opposed to
166 // using them via "window" (which only works in browser)
setTimeoutnull167 private external fun setTimeout(handler: dynamic, timeout: Int = definedExternally): Int
168 private external fun clearTimeout(handle: Int = definedExternally)
169